1.引入依赖,(在微服务模块中)
<!-- Spring WebSocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.新建文件
package com.ruoyi.foundation.webSocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** 开启WebSocket支持*/@Configuration
public class WebSocketConfig {// 使用boot内置tomcat时需要注入此bean@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
package com.ruoyi.foundation.webSocket;import lombok.extern.slf4j.Slf4j;import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class WebsocketUtil {private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();/*** 添加session*/public static void addSession(String userId, Session session){// 一个用户只允许一个session链接ONLINE_SESSION.putIfAbsent(userId, session);log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());}/*** 移除session*/public static void removeSession(String userId){ONLINE_SESSION.remove(userId);log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());}/*** 给单个用户推送消息*/public static void sendMessage(String userId, String message){Session session = ONLINE_SESSION.get(userId);if(session == null){log.warn("Session for user [{}] not found", userId);return;}sendMessage(session, message);}public static void sendMessage(Session session, String message) {if (session != null) {session.getAsyncRemote().sendText(message);}}/*** 给所有用户发消息*/public static void sendMessageForAll(String message) {ONLINE_SESSION.forEach((userId, session) -> {CompletableFuture.runAsync(() -> sendMessage(session, message)).exceptionally(ex -> {log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());return null;});});}/*** 给指定的多个用户推送消息*/public static void sendMessageForUsers(Set<String> userIds, String message) {userIds.forEach(userId -> {Session session = ONLINE_SESSION.get(userId);if (session == null) {log.warn("Session for user [{}] not found", userId);return;}CompletableFuture.runAsync(() -> sendMessage(session, message)).exceptionally(ex -> {log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());return null;});});}
}
package com.ruoyi.foundation.apicontroller;import com.google.gson.Gson;
import com.ruoyi.foundation.apicontroller.req.MemorialHallWebsocketReq;
import com.ruoyi.foundation.webSocket.WebsocketUtil;
import io.seata.common.util.StringUtils;
import org.apache.poi.util.StringUtil;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;@Component
@ServerEndpoint(value = "/api/memorialHallWebsocket/{dailyMissId}/{userId}")
public class MmMemorialHallWebsocketController {/*** 保存每日一念纪念馆中当前在线的用户ID*/private static final Map<String, List<String>> memorialHallUsers = new ConcurrentHashMap<>();private Gson gson=new Gson();@OnOpenpublic void onOpen(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {WebsocketUtil.addSession(userId, session);List<String> strings = memorialHallUsers.get(dailyMissId);if (strings == null){List<String> list=new ArrayList<>();list.add(userId);memorialHallUsers.put(dailyMissId,list);}else{strings.add(userId);}}@OnClosepublic void onClose(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {WebsocketUtil.removeSession(userId);List<String> strings = memorialHallUsers.get(dailyMissId);if(strings != null){strings.remove(userId);}}@OnMessagepublic void onMessage(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session, String message) {/*System.out.println(dailyMissId);System.out.println(userId);System.out.println(session);System.out.println(message);*///MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);List<String> strings = memorialHallUsers.get(dailyMissId);if(strings == null || strings.isEmpty()){return;}Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, userId)).collect(Collectors.toSet());//对同纪念馆的在线用户进行广播WebsocketUtil.sendMessageForUsers(collect,message);}@OnErrorpublic void onError(Session session, Throwable throwable) {try {session.close();} catch (IOException e) {e.printStackTrace();}throwable.printStackTrace();}
}
3.网关允许WebScoket
- id: ruoyi-foundationWebSocketuri: lb:ws://ruoyi-foundationpredicates:- Path=/foundationWebSocket/**filters:- StripPrefix=1
4.测试
5.线上nginx配置
location /mmwzGateWay/ {if ($request_method = OPTIONS) {add_header Access-Control-Allow-Origin $http_origin;add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept";add_header Access-Control-Allow-Methods GET,POST,OPTIONS,HEAD,PUT,DELETE;add_header Access-Control-Allow-Credentials true;return 200;}proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;# WebSocket 相关的头部配置proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_set_header X-Forwarded-Proto $scheme;proxy_pass http://mmwz-gateway:8080/;#proxy_pass http://www.baidu.com/;}