欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Spring + WebSocket

Spring + WebSocket

2024/10/24 12:58:34 来源:https://blog.csdn.net/weixin_44929475/article/details/143181858  浏览:    关键词:Spring + WebSocket

1. 简介

        WebSocket 是一种网络通信协议,提供了在单个TCP连接上进行全双工通信的能力。它允许服务器主动向客户端发送消息,而不需要客户端不断轮询服务器来检查更新。WebSocket 协议在2011年成为国际标准,并且被广泛用于实现实时通信功能,比如在线游戏、设备数据更新、实时聊天应用和股票行情更新等。

WebSocket 的主要特点包括:

  1. 全双工通信:客户端和服务器可以同时发送和接收数据,无需等待对方的响应。

  2. 持久连接:一旦WebSocket连接建立,它会保持开放状态,直到客户端或服务器决定关闭连接。

  3. 低延迟:由于不需要像HTTP那样每次通信都建立和关闭连接,WebSocket可以减少通信延迟。

  4. 头部开销小:WebSocket协议的头部比HTTP要小,这有助于减少数据传输的开销。

WebSocket 通信的建立通常遵循以下步骤:

  1. 握手:客户端通过发送一个特殊的HTTP请求来发起WebSocket连接,这个请求包含了Upgrade头部,表明客户端希望升级到WebSocket协议。

  2. 服务器响应:如果服务器支持WebSocket,它会响应一个HTTP响应,确认协议升级。

  3. 连接建立:一旦握手完成,客户端和服务器之间的连接就转变为WebSocket连接,双方可以开始发送数据。

WebSocket 使用ws(非加密)或wss(加密)作为URL协议前缀,例如:

  • ws://localhost/api/ws表示非加密的WebSocket连接。

  • wss://localhost/api/wss 表示加密的WebSocket连接。

2. Spring + React 实现ws通信

后端:SpringBoot

添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置WebSocket:创建一个配置类来启用WebSocket并定义消息代理:

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic");registry.setApplicationDestinationPrefixes("/app");}
}

这段代码是Spring Framework中用于配置WebSocket消息代理的Java配置类。它使用了Spring的WebSocketMessageBrokerConfigurer接口来定义WebSocket通信的端点和消息代理的行为。以下是对这段代码的详细解释:

  1. @Configuration 注解: 这个注解表明这是一个Spring配置类,Spring容器将会为这个类创建一个bean,并将其加入到应用上下文中。

  2. @EnableWebSocketMessageBroker 注解: 这个注解启用了WebSocket消息代理,允许使用高级消息传递功能,例如消息的订阅和发布。它还启用了STOMP协议的支持。

  3. implements WebSocketMessageBrokerConfigurer: 这个类实现了WebSocketMessageBrokerConfigurer接口,该接口允许配置WebSocket消息代理的细节。

  4. registerStompEndpoints(StompEndpointRegistry registry) 方法: 这个方法用于注册STOMP协议的端点。在这个配置中,它添加了一个端点/ws,并且指定使用SockJS协议。SockJS是一个浏览器JavaScript库,它提供了一个透明的、跨域的WebSocket兼容接口。

    1. registry.addEndpoint("/ws"):注册一个新的WebSocket端点/ws

    2. .withSockJS():指定这个端点使用SockJS协议。

  5. configureMessageBroker(MessageBrokerRegistry registry) 方法: 这个方法用于配置消息代理的行为。

    1. registry.enableSimpleBroker("/topic"):启用一个简单的消息代理,它将消息路由到以/topic为前缀的目的地。这意味着所有发送到/topic前缀的目的地的消息都将被广播给所有订阅了这些目的地的客户端。

    2. registry.setApplicationDestinationPrefixes("/app"):设置应用程序目的地前缀为/app。这意味着所有以/app为前缀的目的地都将被用作应用程序的端点,例如/app/sendMessage

通过这个配置,Spring Boot应用将能够处理WebSocket连接,并且可以使用STOMP协议发送和接收消息。客户端可以通过/ws端点连接到服务器,并且可以订阅以/topic为前缀的目的地来接收消息,或者发送消息到以/app为前缀的目的地。这个配置为构建基于WebSocket的实时通信应用提供了基础。

触发器:controller

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;@Controller
public class WebSocketController {@MessageMapping("/sendMessage")@SendTo("/topic/messages")public String processMessageFromClient(String message) {return "Server response: " + message;}
}

这段代码是一个Spring框架中的WebSocket控制器,使用了STOMP协议来处理WebSocket消息。下面是对这段代码的详细解释:

  1. @Controller 注解: 这个注解表明该类是一个WebSocket控制器,Spring将使用它来处理WebSocket相关的消息。

  2. @MessageMapping("/sendMessage") 注解: 这个注解定义了一个消息映射,它告诉Spring当接收到发送到/app/sendMessage(因为还有一个setApplicationDestinationPrefixes("/app")的配置)的WebSocket消息时,应该调用processMessageFromClient方法来处理这个消息。

  3. @SendTo("/topic/messages") 注解: 这个注解指定了处理方法返回的消息应该被发送到哪个目的地。在这个例子中,processMessageFromClient方法的返回值将被发送到/topic/messages,这意味着所有订阅了/topic/messages的客户端都将收到这个消息。

  4. processMessageFromClient 方法: 这个方法是实际处理客户端消息的方法。它接收一个字符串参数message,这个字符串是从客户端接收到的消息内容。方法返回一个字符串,这个字符串是服务器对客户端消息的响应。

    1. 方法参数:String message 是从客户端接收到的消息。

    2. 方法返回值:String 是服务器对客户端消息的响应,这里简单地在客户端消息前加上了前缀"Server response: "

当客户端通过WebSocket连接并发送消息到/app/sendMessage时,Spring将自动调用processMessageFromClient方法,并将客户端发送的消息作为参数传递给这个方法。方法处理完消息后,返回的响应将被发送到所有订阅了/topic/messages的客户端。

触发器:端点

使用@ServerEndpoint注解定义WebSocket端点:

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;@ServerEndpoint("/ws")
public class WebSocketEndpoint {private static final Set<WebSocketEndpoint> endpoints = new CopyOnWriteArraySet<>();public WebSocketEndpoint() {endpoints.add(this);}@OnOpenpublic void onOpen(Session session) {System.out.println("New connection opened: " + session.getId());}@OnMessagepublic void onMessage(String message, Session session) {System.out.println("Received message from " + session.getId() + ": " + message);for (WebSocketEndpoint endpoint : endpoints) {try {endpoint.sendMessage(session.getId(), message);} catch (IOException e) {e.printStackTrace();}}}@OnClosepublic void onClose(Session session) {System.out.println("Connection closed: " + session.getId());endpoints.remove(this);}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("Error in connection: " + session.getId());throwable.printStackTrace();endpoints.remove(this);}public void sendMessage(String sessionId, String message) throws IOException {Session session = sessions.get(sessionId);if (session != null) {session.getBasicRemote().sendText(message);}}
}

使用@ServerEndpoint注解来定义WebSocket端点,而不是通过控制器层的处理器。这样,当客户端连接到/ws端点时,会自动触发WebSocketEndpoint实例的onOpen方法,从而建立WebSocket连接。

  • @ServerEndpoint("/ws"): 这是一个注解,用于定义WebSocket的端点。在这个例子中,客户端可以通过访问/ws路径来建立WebSocket连接。

  • private static final Set<WebSocketEndpoint> endpoints: 这是一个静态的Set集合,用于存储所有活动的WebSocketEndpoint实例。它使用CopyOnWriteArraySet,这是一种线程安全的变体,适用于读多写少的场景。

  • public WebSocketEndpoint(): 这是WebSocketEndpoint类的构造函数。在创建新的实例时,它会将自己添加到endpoints集合中。

  • @OnOpen public void onOpen(Session session): 这个方法在新的WebSocket会话打开时被调用。Session对象代表与客户端的连接,可以用于发送消息和关闭连接。在这个方法中,通常可以执行一些初始化操作,比如打印日志或将用户信息与会话关联。

  • @OnMessage public void onMessage(String message, Session session): 这个方法在服务器接收到客户端发送的消息时被调用。它接收消息内容和与之关联的Session对象。在这个例子中,服务器接收到消息后,会将消息打印到控制台,并将相同的消息广播给所有连接的客户端。

  • @OnClose public void onClose(Session session): 这个方法在WebSocket会话关闭时被调用。在这里,可以执行一些清理操作,比如从endpoints集合中移除当前的WebSocketEndpoint实例。

  • @OnError public void onError(Session session, Throwable throwable): 这个方法在WebSocket会话发生错误时被调用。它接收与错误关联的Session对象和一个Throwable对象,后者包含了错误的详细信息。在这里,可以处理错误,比如打印堆栈跟踪或从endpoints集合中移除当前的WebSocketEndpoint实例。

  • public void sendMessage(String sessionId, String message) throws IOException: 这个方法用于向特定的会话发送消息。它接收会话ID和要发送的消息内容。方法内部,它查找与给定会话ID关联的Session对象,并使用BasicRemote对象发送消息。如果会话不存在或发生IO异常,可能会抛出IOException

前端:React

安装SockJS和STOMP客户端:使用npm安装SockJS和STOMP客户端库:

npm install sockjs-client stompjs

创建WebSocket逻辑:在React组件中,使用SockJS和STOMP客户端连接到后端WebSocket服务器,并发送/接收消息。

import React, { useEffect, useState } from 'react';
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';function App() {const [messages, setMessages] = useState([]);const [inputValue, setInputValue] = useState('');useEffect(() => {const socket = new SockJS('http://localhost:8080/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, frame => {stompClient.subscribe('/topic/messages', message => {const messageBody = JSON.parse(message.body);setMessages(prevMessages => [...prevMessages, messageBody]);});});const sendMessage = () => {stompClient.send("/app/sendMessage", {}, inputValue);setInputValue(''); // Clear input field after sending};return () => {stompClient.disconnect();};}, [inputValue]);return (<div><h1>WebSocket Messages</h1><ul>{messages.map((message, index) => (<li key={index}>{message}</li>))}</ul><inputvalue={inputValue}onChange={e => setInputValue(e.target.value)}placeholder="Type a message"/><button onClick={sendMessage}>Send</button></div>);
}export default App;
  • 这个钩子用于处理组件挂载和卸载时的逻辑。

  • 当组件挂载时(useEffect无依赖或依赖项变化时),它创建一个SockJS连接和一个Stomp客户端。

  • stompClient.connect用于建立与服务器的STOMP连接。连接成功后,它订阅/topic/messages主题,以便接收服务器发送的消息。

  • setMessages用于更新状态,将新接收的消息添加到messages数组中。

  • sendMessage函数用于发送消息到服务器。它将inputValue发送到/app/sendMessage目的地,并在发送后清空输入框。

3. 实际业务

        一批设备通过mqtt传输消息,前端页面需要显示设备实时信息,如天气、风力、速度等。业务逻辑可以为,后端代码接收到设备通过mqtt传输的消息,将其一定量转化,然后通过ws协议将数据推送,最后前端拿到数据显示。

1. 定义端点

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageConfiguration implements WebSocketMessageBrokerConfigurer {@Autowiredprivate AuthPrincipalHandler authPrincipalHandler;@Autowiredprivate WebSocketDefaultFactory webSocketDefaultFactory;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// Set the WebSocket connection addressregistry.addEndpoint("/api/v1/ws").setAllowedOriginPatterns("*").setHandshakeHandler(authPrincipalHandler);}@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registry) {registry.addDecoratorFactory(webSocketDefaultFactory);registry.setTimeToFirstMessage(60000 * 60 * 24 * 10);}}
  1. implements WebSocketMessageBrokerConfigurer:这个类实现了WebSocketMessageBrokerConfigurer接口,该接口允许配置WebSocket消息代理的细节。

  2. AuthPrincipalHandler authPrincipalHandler:这是一个自定义的握手处理器,用于在WebSocket连接建立之前进行一些认证操作。

  3. WebSocketDefaultFactory webSocketDefaultFactory:这是一个自定义的装饰器工厂,用于添加一些默认的配置或处理逻辑。

  4. registerStompEndpoints(StompEndpointRegistry registry):这个方法用于注册STOMP协议的端点。在这个例子中,端点被设置为/api/v1/ws,并且允许来自任何源的连接(setAllowedOriginPatterns("*"))。同时,设置了自定义的握手处理器authPrincipalHandler

  5. configureWebSocketTransport(WebSocketTransportRegistration registry):这个方法用于配置WebSocket传输层的一些设置。在这个例子中,它添加了一个自定义的装饰器工厂webSocketDefaultFactory,并且设置了首次消息时间(timeToFirstMessage)为10天。

2. 消息服务

@Service
@Slf4j
public class SendMessageServiceImpl implements ISendMessageService {@Autowiredprivate ObjectMapper mapper;@Autowiredprivate IWebSocketManageService webSocketManageService;@Overridepublic void sendMessage(ConcurrentWebSocketSession session, CustomWebSocketMessage message) {if (session == null) {return;}try {if (!session.isOpen()) {session.close();log.debug("This session is closed.");return;}session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message)));} catch (IOException e) {log.info("Failed to publish the message. {}", message.toString());e.printStackTrace();}}@Overridepublic void sendBatch(Collection<ConcurrentWebSocketSession> sessions, CustomWebSocketMessage message) {if (sessions.isEmpty()) {return;}try {TextMessage data = new TextMessage(mapper.writeValueAsBytes(message));for (ConcurrentWebSocketSession session : sessions) {if (!session.isOpen()) {session.close();log.debug("This session is closed.");return;}session.sendMessage(data);}} catch (IOException e) {log.info("Failed to publish the message. {}", message.toString());e.printStackTrace();}}@Overridepublic void sendBatch(String workspaceId, Integer userType, String bizCode, Object data) {if (!StringUtils.hasText(workspaceId)) {throw new RuntimeException("Workspace ID does not exist.");}Collection<ConcurrentWebSocketSession> sessions = Objects.isNull(userType) ?webSocketManageService.getValueWithWorkspace(workspaceId) :webSocketManageService.getValueWithWorkspaceAndUserType(workspaceId, userType);this.sendBatch(sessions, CustomWebSocketMessage.builder().data(data).timestamp(System.currentTimeMillis()).bizCode(bizCode).build());}@Overridepublic void sendBatch(String workspaceId, String bizCode, Object data) {this.sendBatch(workspaceId, null, bizCode, data);}
}

底层消息发送是通过ConcurrentWebSocketSessionDecorator类中的sendMessage方法

public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator
{
public void sendMessage(WebSocketMessage<?> message) throws IOException {if (!this.shouldNotSend()) {this.buffer.add(message);this.bufferSize.addAndGet(message.getPayloadLength());if (this.preSendCallback != null) {this.preSendCallback.accept(message);}do {if (!this.tryFlushMessageBuffer()) {if (logger.isTraceEnabled()) {logger.trace(String.format("Another send already in progress: session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", this.getId(), this.getTimeSinceSendStarted(), this.getBufferSize()));}this.checkSessionLimits();break;}} while(!this.buffer.isEmpty() && !this.shouldNotSend());}
}
}

3. 使用

@Autowired
private SendMessageServiceImpl sendMessageService;@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_HMS)
public void handleHms(CommonTopicReceiver receiver, MessageHeaders headers) {List<DeviceHmsDTO> unReadList = new ArrayList<>();objectMapper.convertValue(((Map) (receiver.getData())).get(MapKeyConst.LIST),new TypeReference<List<DeviceHmsReceiver>>() {}).forEach(hmsReceiver -> {final DeviceHmsEntity hms = entity.clone();this.fillEntity(hms, hmsReceiver);// The same unread hms are no longer incremented.if (hmsMap.contains(hms.getHmsKey())) {return;}this.fillMessage(hms, hmsReceiver.getArgs());DockDevice dagDevice = this.dagBridge.getDockDevice(sn);if (dagDevice != null&&hms.getLevel()==2) {dagDevice.drone.AlertInfoReport(hms.getMessageZh());}unReadList.add(entity2Dto(hms));mapper.insert(hms);});sendMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),BizCodeEnum.DEVICE_HMS.getCode(),TelemetryDTO.<List<DeviceHmsDTO>>builder().sn(sn).host(unReadList).build());
}

        在这个消息通道中有消息,接收消息转换并将其发送到http://localhost:1234/api/ws。前端代码订阅该主题即可获取响应消息。

注:有没有发现这个很像发布/订阅模型?

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com