1.WebSocket配置类。开启WebSocket的支持
@Configuration public class WebSocketConfig {/*** bean注册:会自动扫描带有@ServerEndpoint注解声明的Websocket Endpoint(端点),注册成为Websocket bean。* 要注意,如果项目使用外置的servlet容器,而不是直接使用springboot内置容器的话,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();} }
2.web套接字管理器
@Slf4j
public class WebSocketManager {
private final static CopyOnWriteArraySet<WebSocketServer> webSocketServerSet = new CopyOnWriteArraySet<>();
private final static ConcurrentHashMap<String, WebSocketServer> webSocketServerMap = new ConcurrentHashMap<>();
public static void addWebSocketServer(WebSocketServer webSocketServer) {
if (webSocketServer != null) {
webSocketServerSet.add(webSocketServer);
webSocketServerMap.put(webSocketServer.getSessionId(), webSocketServer);
}
}
public static void removeWebSocketServer(WebSocketServer webSocketServer) {
webSocketServerSet.remove(webSocketServer);
webSocketServerMap.remove(webSocketServer.getSessionId());
}
/**
* 通过SessionId发送消息给特定用户
*
* @param
* @param msg
*/
public static void sentToUser(String sessionId, String msg) {
Session session = webSocketServerMap.get(sessionId).getSession();
sentToUser(session, msg);
}
/**
* 通过Session发送消息给特定用户
*
* @param session
* @param msg
*/
public static void sentToUser(Session session, String msg) {
if (session == null) {
log.error("不存在该Session,无法发送消息");
return;
}
session.getAsyncRemote().sendText(msg);
}
/**
* 发送消息给所有用户
*
* @param msg
*/
public static void sentToAllUser(String msg) {
for (WebSocketServer webSocketServer : webSocketServerSet) {
sentToUser(webSocketServer.getSession(), msg);
}
log.info("向所有用户发送WebSocket消息完毕,消息:{}", msg);
}
}
3.web套接字Server
/*** web套接字服务器** @author yolo* @date 2024/03/15 15:03:17*/ @RestController @ServerEndpoint("/websocket") @Slf4j public class WebSocketServer {private Session session;@ApiOperation(value = "list ")@GetMapping(value = "list")public BaseResult<List<BaseCarPart>> listWithoutPage() {return BaseResult.ofSuccess( "aaaaaaa");}@OnOpenpublic void onOpen(Session session) {this.session = session;log.info("WebSocket连接成功" + session.toString());WebSocketManager.sentToUser(session, "WebSocket is connected!");WebSocketManager.addWebSocketServer(this);log.info("与SessionId:{}建立连接", session.getId());}@OnClosepublic void onClose(Session session, CloseReason closeReason) {log.info("与SessionId:{}断开连接{}", session.getId(),closeReason.getCloseCode().toString());WebSocketManager.removeWebSocketServer(this);log.info("WebSocket连接关闭");}@OnMessagepublic void onMessage(String message, Session session) {log.info("来自SessionId:{}的消息:{}", session.getId(), message);session.getAsyncRemote().sendText(message+"========");}@OnErrorpublic void onError(Session session, Throwable error) {log.error("Session:{}的WebSocket发生错误", session.getId(), error);}public Session getSession() {log.info("获取Session:{}", session.toString());return session;}public String getSessionId() {log.info("获取SessionId:{}", session.getId());return session.getId();} }