在当今的互联网应用中,即时通讯功能已经变得不可或缺。无论是社交应用、在线客服还是实时数据推送,都需要高效的通信框架来支持。TIO(Try-IO)是一个高性能的Java网络通信框架,支持TCP/UDP/HTTP/WebSocket等多种协议,非常适合用于即时通讯场景。本文将介绍如何在Spring Boot项目中整合TIO,实现一个简单的即时通讯功能。
1. TIO简介
TIO是一个基于Java的高性能网络通信框架,具有以下特点:
支持多种协议:TCP、UDP、HTTP、WebSocket等。
高性能:基于NIO实现,支持高并发。
易用性:提供了丰富的API和灵活的配置
2. 环境准备
在开始之前,请确保你已经安装了以下工具:
JDK 1.8及以上版本(本项目使用jdk 17版本)
Maven或Gradle
IntelliJ IDEA或Eclipse
4.添加依赖
在pom.xml文件中添加TIO的相关依赖
<!-- t-io WebSocket依赖 --><dependency><groupId>org.t-io</groupId><artifactId>tio-websocket-server</artifactId><version>3.8.6.v20240801-RELEASE</version></dependency><!-- t-io 核心依赖 --><dependency><groupId>org.t-io</groupId><artifactId>tio-core</artifactId><version>3.8.6.v20240801-RELEASE</version></dependency>
5、创建TIO消息处理器
创建一个实现IWsMsgHandler接口的类,用于处理WebSocket消息:
import com.alibaba.fastjson.JSON;
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import com.ruoyi.im.domain.ImMessage;
import com.ruoyi.im.model.ChatMessage;
import com.ruoyi.im.service.IImMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;import java.util.Date;
import java.util.UUID;@Component
public class WebSocketMessageHandler implements IWsMsgHandler {private static final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);/*** 从ChannelContext中获取参数*/private String getParam(ChannelContext channelContext, String paramName) {try {// 尝试从channelContext的httpConfig或其他属性中获取参数Object requestObj = channelContext.getAttribute("httpRequest");if (requestObj != null && requestObj instanceof HttpRequest) {HttpRequest httpRequest = (HttpRequest) requestObj;return httpRequest.getParam(paramName);}// 从user属性中获取用户IDif ("userId".equals(paramName)) {return (String) channelContext.getAttribute("userId");}return null;} catch (Exception e) {log.error("获取参数失败: paramName={}", paramName, e);return null;}}@Overridepublic HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {// 从请求中获取用户ID、token和设备类型String userId = httpRequest.getParam("userId");String token = httpRequest.getParam("token");String deviceTypeStr = httpRequest.getParam("deviceType");// 验证参数if (userId == null || token == null || deviceTypeStr == null) {log.error("握手参数不完整: userId={}, token={}, deviceType={}", userId, token, deviceTypeStr);return httpResponse;}// 验证token(这里应该调用若依的token验证服务)// TODO: 实现token验证逻辑// 解析设备类型UserSessionManager.DeviceType deviceType;try {deviceType = UserSessionManager.DeviceType.valueOf(deviceTypeStr.toUpperCase());} catch (IllegalArgumentException e) {log.error("无效的设备类型: {}", deviceTypeStr);return httpResponse;}// 保存参数到ChannelContextchannelContext.setAttribute("httpRequest", httpRequest);channelContext.setAttribute("userId", userId);// 添加用户会话userSessionManager.addUserSession(userId, token, deviceType, channelContext);return httpResponse;}@Overridepublic void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {String userId = httpRequest.getParam("userId");log.info("握手成功,userId: {}, channelId: {}", userId, channelContext.getId());}@Overridepublic Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {return null;}@Overridepublic Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {// 从channelContext获取用户IDString userId = getParam(channelContext, "userId");// 移除用户会话if (userId != null) {userSessionManager.removeUserSession(userId);}log.info("连接关闭,userId: {}, channelId: {}", userId, channelContext.getId());return null;}@Overridepublic Object onText(WsRequest wsRequest, String msg, ChannelContext channelContext) {// 从channelContext获取用户IDString userId = getParam(channelContext, "userId");log.info("收到消息: {}, userId: {}, channelId: {}", msg, userId, channelContext.getId());try {// 解析消息ChatMessage chatMessage = JSON.parseObject(msg, ChatMessage.class);// 设置消息ID和发送时间if (chatMessage.getMessageId() == null) {chatMessage.setMessageId(UUID.randomUUID().toString());}if (chatMessage.getSendTime() == null) {chatMessage.setSendTime(new Date());}// 设置发送者IDchatMessage.setFromUserId(userId);// 更新用户最后活动时间userSessionManager.updateUserLastActiveTime(userId);// 创建消息实体并持久化ImMessage imMessage = new ImMessage();imMessage.setMessageId(chatMessage.getMessageId());imMessage.setFromUserId(chatMessage.getFromUserId());imMessage.setToUserId(chatMessage.getToUserId());imMessage.setGroupId(chatMessage.getGroupId());imMessage.setContent(chatMessage.getContent());imMessage.setMessageType(chatMessage.getMessageType());imMessage.setStatus(0); // 0表示未读imMessage.setSendTime(chatMessage.getSendTime());imMessage.setCreateTime(chatMessage.getSendTime());imMessage.setUpdateTime(chatMessage.getSendTime());imMessage.setIsSent(1); // 1表示已发送imMessage.setRetryCount(0); // 初始重试次数为0// 保存消息到数据库imMessageService.insertImMessage(imMessage);// 处理消息if (chatMessage.getToUserId() != null) {// 私聊消息userSessionManager.sendMessageToUser(chatMessage.getToUserId(), JSON.toJSONString(chatMessage));} else if (chatMessage.getGroupId() != null) {// 群聊消息(需要实现群组管理)// TODO: 实现群聊消息处理} else {// 广播消息userSessionManager.broadcastMessage(JSON.toJSONString(chatMessage));}// 发送响应消息WsResponse wsResponse = WsResponse.fromText("SendMessageSuccess:" + msg, "UTF-8");TioWebSocketAdapterConfig.send(channelContext, wsResponse);} catch (Exception e) {log.error("处理消息失败: {}", msg, e);WsResponse wsResponse = WsResponse.fromText("处理消息失败: " + e.getMessage(), "UTF-8");TioWebSocketAdapterConfig.send(channelContext, wsResponse);}return null;}
}
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.websocket.common.WsResponse;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@Component
public class UserSessionManager {private static final Logger log = LoggerFactory.getLogger(UserSessionManager.class);// 用户ID -> 用户会话信息private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();// 设备类型枚举public enum DeviceType {WEB, // Web端MINI, // 小程序APP // APP}// 用户会话信息类public static class UserSession {private String userId;private String token;private DeviceType deviceType;private ChannelContext channelContext;private long lastActiveTime;public UserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {this.userId = userId;this.token = token;this.deviceType = deviceType;this.channelContext = channelContext;this.lastActiveTime = System.currentTimeMillis();}// Getters and setterspublic String getUserId() { return userId; }public void setUserId(String userId) { this.userId = userId; }public String getToken() { return token; }public void setToken(String token) { this.token = token; }public DeviceType getDeviceType() { return deviceType; }public void setDeviceType(DeviceType deviceType) { this.deviceType = deviceType; }public ChannelContext getChannelContext() { return channelContext; }public void setChannelContext(ChannelContext channelContext) { this.channelContext = channelContext; }public long getLastActiveTime() { return lastActiveTime; }public void setLastActiveTime(long lastActiveTime) { this.lastActiveTime = lastActiveTime; }}// 添加用户会话public void addUserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {UserSession session = new UserSession(userId, token, deviceType, channelContext);userSessions.put(userId, session);// 绑定用户ID到ChannelContext,使用适配器TioWebSocketAdapterConfig.bindUser(channelContext, userId);log.info("用户会话添加成功: userId={}, deviceType={}", userId, deviceType);}// 移除用户会话public void removeUserSession(String userId) {UserSession session = userSessions.remove(userId);if (session != null) {// 解绑用户ID,使用适配器TioWebSocketAdapterConfig.unbindUser(session.getChannelContext(), userId);log.info("用户会话移除成功: userId={}", userId);}}// 获取用户会话public UserSession getUserSession(String userId) {return userSessions.get(userId);}// 更新用户最后活动时间public void updateUserLastActiveTime(String userId) {UserSession session = userSessions.get(userId);if (session != null) {session.setLastActiveTime(System.currentTimeMillis());}}// 发送消息给指定用户public void sendMessageToUser(String userId, String message) {UserSession session = userSessions.get(userId);if (session != null && session.getChannelContext() != null) {WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");// 使用适配器发送消息TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);} else {log.warn("用户不在线,无法发送消息: userId={}", userId);}}// 广播消息给所有用户public void broadcastMessage(String message) {if (userSessions.isEmpty()) {return;}WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");for (UserSession session : userSessions.values()) {if (session.getChannelContext() != null) {// 使用适配器发送消息TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);}}}// 获取在线用户数量public int getOnlineUserCount() {return userSessions.size();}// 获取所有在线用户IDpublic Set<String> getOnlineUserIds() {return userSessions.keySet();}
}
6、创建TIO配置类和 TIO WebSocket适配器配置
import com.ruoyi.im.websocket.WebSocketMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.websocket.server.WsServerConfig;
import org.tio.websocket.server.WsServerStarter;import java.io.IOException;@Configuration
public class TioWebSocketConfig {@Autowiredprivate WebSocketMessageHandler webSocketMessageHandler;@Beanpublic WsServerStarter wsServerStarter() throws IOException {// 配置t-io websocket服务器,指定端口WsServerConfig wsServerConfig = new WsServerConfig(9321);// 创建WebSocket服务器WsServerStarter wsServerStarter = new WsServerStarter(wsServerConfig, webSocketMessageHandler);// 这里不再获取和配置ServerTioConfigreturn wsServerStarter;}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;/*** t-io WebSocket适配器配置* 帮助处理不同版本的t-io兼容性问题*/
@Configuration
public class TioWebSocketAdapterConfig {private static final Logger log = LoggerFactory.getLogger(TioWebSocketAdapterConfig.class);/*** 发送消息** @param channelContext 通道上下文* @param packet 数据包*/public static void send(ChannelContext channelContext, Packet packet) {if (channelContext == null || packet == null) {log.warn("发送消息失败:通道上下文或数据包为空");return;}try {// 直接调用 Tio 的 send 方法Tio.send(channelContext, packet);} catch (Exception e) {log.error("发送消息失败", e);}}/*** 绑定用户** @param channelContext 通道上下文* @param userId 用户ID*/public static void bindUser(ChannelContext channelContext, String userId) {if (channelContext == null || userId == null) {log.warn("绑定用户失败:通道上下文或用户ID为空");return;}try {// 直接调用 Tio 的 bindUser 方法Tio.bindUser(channelContext, userId);} catch (Exception e) {log.error("绑定用户失败", e);}}/*** 解绑用户** @param channelContext 通道上下文* @param userId 用户ID*/public static void unbindUser(ChannelContext channelContext, String userId) {if (channelContext == null) {log.warn("解绑用户失败:通道上下文为空");return;}try {// 直接调用 Tio 的 unbindUser 方法Tio.unbindUser(channelContext);} catch (Exception e) {log.error("解绑用户失败", e);}}
}
7、配置TIO启动类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.websocket.server.WsServerStarter;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.lang.reflect.Method;@Component
public class WebSocketServer {private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);@PostConstructpublic void start() {try {wsServerStarter.start();log.info("WebSocket服务器启动成功,监听端口:9321");} catch (IOException e) {log.error("WebSocket服务器启动失败", e);}}
启动springboot主类,即可启动tio服务
public static void main(String[] args) {SpringApplication.run(RuoYiApplication.class, args);}
8、测试即时通讯功能
可以使用websocket工具进行测试
##链接地址:
ws://127.0.0.1:9321?userId=1&token=1&deviceType=MINI##链接参数说明:
userId:当前用户id
token:token值
deviceType:设备类型 WEB Web端、MINI 小程序 、APP##发送消息示例:
{"fromUserId": "1",发送者用户ID"toUserId": "2",//接收者用户ID"content": "测试消息内容",//消息内容"messageType": "1"//消息类型1=文本,2=图片,3=语音,4=视频
}##发送后响应示例:
SendMessageSuccess:{ "fromUserId": "1", "toUserId": "2", "content": "测试消息内容", "messageType": "1" }