欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 【Springboot后端之间使用websocket长连接通信】

【Springboot后端之间使用websocket长连接通信】

2025/4/18 13:50:10 来源:https://blog.csdn.net/qq_39203889/article/details/147037716  浏览:    关键词:【Springboot后端之间使用websocket长连接通信】

Springboot后端之间使用websocket长连接通信

  • 背景
  • 版本
  • 依赖
    • 父工程
    • 生产
    • 消费
  • 消费方
    • 开启websocket配置
    • websocket控制器
  • 生产
    • 配置
    • 两种数据发送
  • 验证

背景

有时需要使用websocket长连接在Springboot之间进行数据通信。这篇就是一个简单示例

版本

  • springboot 2.7.18
  • JDK 21

依赖

父工程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>test-websocket</artifactId><version>1.0-SNAPSHOT</version><packaging>pom</packaging><modules><module>produce</module><module>consumer</module></modules><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><springboot.version>2.7.18</springboot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${springboot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>

生产

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.5.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.54</version></dependency>
</dependencies>

消费

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.35</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.54</version></dependency>
</dependencies>

消费方

主要工作是接收数据和对数据做处理,比如合并key相同的数据后再返回发送

开启websocket配置

// 配置
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

websocket控制器

package com.xu.controller;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;/**/
@ServerEndpoint(value = "/websocket/{userId}")
@Component
@Slf4j
public class WebSocket {private final static Logger logger = LogManager.getLogger(WebSocket.class);/*** 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的*/private static int onlineCount = 0;/*** concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象*/private static final ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>();private static final ConcurrentHashMap<String, JSONObject> msgCache = new ConcurrentHashMap<>();/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;private String userId;/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {this.session = session;this.userId = userId;//加入mapwebSocketMap.put(userId, this);addOnlineCount();           //在线数加1logger.info("用户{}连接成功,当前在线人数为{},数据为:{}", userId, getOnlineCount(), userId);try {sendMessage(String.valueOf(this.session.getQueryString()));} catch (IOException e) {logger.error("IO异常");}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {//从map中删除webSocketMap.remove(userId);subOnlineCount();           //在线数减1logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount());}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) {logger.info("来自客户端用户:{} 消息:{}", userId, message);JSONObject msg = JSONObject.parseObject(message);String traceId = msg.getString("traceId");if (msgCache.containsKey(traceId)) {JSONObject jsonObject = msgCache.get(traceId);JSONObject union = new JSONObject();union.put("data1", jsonObject);union.put("data2", msg);msgCache.remove(traceId);publish(union);} else {msgCache.put(traceId, msg);}}private void publish(JSONObject union) {//群发消息for (String item : webSocketMap.keySet()) {try {webSocketMap.get(item).sendMessage(union.toJSONString());} catch (IOException e) {log.error("消息发送失败,", e);}}}/*** 发生错误时调用*/@OnErrorpublic void onError(Session session, Throwable error) {logger.error("用户错误:{},原因:{}", this.userId, error.getMessage());}/*** 向客户端发送消息*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);//this.session.getAsyncRemote().sendText(message);}/*** 通过userId向客户端发送消息*/public void sendMessageByUserId(String userId, String message) throws IOException {logger.info("服务端发送消息到{},消息:{}", userId, message);if (StrUtil.isNotBlank(userId) && webSocketMap.containsKey(userId)) {webSocketMap.get(userId).sendMessage("hello");} else {logger.error("用户{}不在线", userId);}}/*** 群发自定义消息*/public void sendInfo(String message) throws IOException {for (String item : webSocketMap.keySet()) {try {webSocketMap.get(item).sendMessage(message);} catch (IOException e) {log.error("群发失败,", e);}}}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {WebSocket.onlineCount++;}public static synchronized void subOnlineCount() {WebSocket.onlineCount--;}}

生产

主要工作是生产数据向消费者发送

配置

package PApp.conf;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** @author kele* @date 2024/2/19**/
@Slf4j
public class MyWebSocketClient extends WebSocketClient {public MyWebSocketClient(URI serverUri) {super(serverUri);}@SneakyThrows@Overridepublic void onOpen(ServerHandshake data) {try {log.info("WebSocket连接已打开。");}catch (Exception e){log.error("onOpen error :{}",e.getMessage());}}@SneakyThrows@Overridepublic void onMessage(String message) {try {if (message != null && !message.isEmpty()) {log.info("收到消息: {}",message);}}catch (Exception e){log.error("onMessage error : {}",message);}}@Overridepublic void onClose(int code, String reason, boolean remote) {log.info("WebSocket连接已关闭。");}@Overridepublic void onError(Exception ex) {log.info("WebSocket连接发生错误:{}", ex.getMessage());}/*** 连接定时检查*/public void startReconnectTask(long delay, TimeUnit unit) {log.info("WebSocket 心跳检查");// 以下为定时器,建议使用自定义线程池,或交给框架处理(spring)try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) {executorService.scheduleWithFixedDelay(() -> {// 检查逻辑:判断当前连接是否连通。if (!this.isOpen()) {System.out.println("WebSocket 开始重连......");log.info("WebSocket 开始重连......");// 重置连接this.reconnect();}}, 0, delay, unit);}}}

两种数据发送

package PApp.service;import PApp.conf.MyWebSocketClient;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.net.URI;import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class Init implements Runnable {public static MyWebSocketClient myWebSocketClient;@PostConstructpublic void run() {try {//启动连接log.info("连接websocket服务端");log.info("项目启动");// 服务地址URI uri = new URI("ws://127.0.0.1:10031/websocket/123");log.info("服务地址 -{}", uri);// 创建客户端myWebSocketClient = new MyWebSocketClient(uri);// 建立连接myWebSocketClient.connect();// 开启 定时检查myWebSocketClient.startReconnectTask(5, TimeUnit.SECONDS);TimeUnit.SECONDS.sleep(10);for (int i = 0; i < 2; ++i) {JSONObject obj = new JSONObject();obj.put("type", String.valueOf(i + 1));obj.put("data", i + 1);obj.put("traceId", "123");String json = JSON.toJSONString(obj.toJSONString());myWebSocketClient.send(json);}} catch (Exception e) {log.error("连接失败", e);}}
}

验证

先启动消费者,再启动生产者。
消费者收到数据应该是:
消费者收数据
生产者收到处理后的数据:
生产者收数据

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词