欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > WebSocket

WebSocket

2025/4/22 8:49:44 来源:https://blog.csdn.net/qq_45765353/article/details/144764443  浏览:    关键词:WebSocket

WebSocket

      • 1.引入依赖
      • 2.创建实体类
      • 3.创建接口
      • 4.异常消息服务
      • 5.WebSocket 控制器
      • 6.配置 WebSocket
      • 7.客户端 (Vue.js)

实现一个简单的webSocket,实现异常信息的推送,重试等

1.引入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2.创建实体类

@Data
public class ExceptionMessage {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String message;private LocalDateTime createdAt;// 'PENDING', 'SENT', 'FAILED'private String status;private int retries;private LocalDateTime retryAt;
}

3.创建接口

public interface ExceptionMessageMapper extends JpaRepository<ExceptionMessage, Long> {// 获取所有状态为PENDING的消息List<ExceptionMessage> findByStatus(String status);// 获取重试时间早于当前时间的消息List<ExceptionMessage> findByStatusAndRetryAtBefore(String status, LocalDateTime now);
}

4.异常消息服务

@Service
public class ExceptionService {private static final int MAX_RETRIES = 3;private static final long RETRY_INTERVAL_MS = 5000;  // 5秒重试间隔private final ExceptionMessageMapper messageMapper;private final SimpMessagingTemplate messagingTemplate;@Autowiredpublic ExceptionService(ExceptionMessageMapper messageMapper, SimpMessagingTemplate messagingTemplate) {this.messageMapper = messageMapper;this.messagingTemplate = messagingTemplate;}// 存储异常消息到数据库@Transactionalpublic void storeExceptionMessage(String exceptionData) {ExceptionMessage message = new ExceptionMessage();message.setMessage(exceptionData);message.setStatus(StatusEnum.PENDING.getStringVal());  // 初始状态为PENDINGmessage.setRetries(0);message.setCreatedAt(LocalDateTime.now());messageMapper.save(message);}// 向前端发送消息public void sendMessage(String exceptionData) {messagingTemplate.convertAndSend(TopicEnum.EXCEPTIONS.getStringVal(), exceptionData);}// 发送单个消息并更新状态@Transactionalpublic void processMessage(ExceptionMessage message) {try {// 存储消息到数据库storeExceptionMessage(message.getMessage());// 推送消息到前端sendMessage(message.getMessage());// 更新消息状态为SENTmessage.setStatus(StatusEnum.SENT.getStringVal());messageMapper.save(message);} catch (Exception e) {// 如果发送失败,更新为FAILED并设置重试时间message.setStatus(StatusEnum.FAILED.getStringVal());message.setRetryAt(LocalDateTime.now().plus(Duration.ofMillis(RETRY_INTERVAL_MS)));message.setRetries(message.getRetries() + 1);messageMapper.save(message);}}// 重试未成功发送的消息public void retryFailedMessages() {List<ExceptionMessage> failedMessages = messageMapper.findByStatusAndRetryAtBefore(StatusEnum.FAILED.getStringVal(), LocalDateTime.now());for (ExceptionMessage message : failedMessages) {if (message.getRetries() < MAX_RETRIES) {processMessage(message);} else {// 超过最大重试次数,放弃推送message.setStatus(StatusEnum.GIVE_UP.getStringVal());messageMapper.save(message);}}}// 重新发送所有PENDING状态的消息public void resendPendingMessages() {List<ExceptionMessage> pendingMessages = messageMapper.findByStatus(StatusEnum.PENDING.getStringVal());for (ExceptionMessage message : pendingMessages) {processMessage(message);}}
}

5.WebSocket 控制器

处理WebSocket连接和消息的推送。

@RestController
@RequestMapping(value = "/app")
public class WebSocketController {private final org.example.webSocket.service.ExceptionService exceptionService;public WebSocketController(org.example.webSocket.service.ExceptionService exceptionService) {this.exceptionService = exceptionService;}/*** 由前端通过 STOMP 协议 调用的,这是一个处理连接请求的方法。* 每当前端连接到 WebSocket 时,会触发此方法执行,从而执行恢复未发送的消息**/@MessageMapping("/connect")public void handleConnect() {exceptionService.resendPendingMessages();}/*** 每5秒定时重试失败的消息**/@Scheduled(fixedRate = 5000)public void retryFailedMessages() {exceptionService.retryFailedMessages();}
}

6.配置 WebSocket

在Spring Boot中启用WebSocket。

/*** webSocket配置类,注册WebSocket端点*/
@Configuration
@EnableWebSocketMessageBroker
@EnableScheduling
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 启用简单的消息代理,用于处理客户端的订阅(如:/topic/xxx)registry.enableSimpleBroker("/topic");// 设置应用程序的前缀,用于处理来自客户端的消息(如:/app/xxx)registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册 WebSocket 端点,客户端通过此端点连接 WebSocket 服务registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();}
}

7.客户端 (Vue.js)

在Vue前端,你需要在WebSocket连接成功时向后端发送连接请求以获取未发送的消息。

import { Stomp } from '@stomp/stompjs';
import SockJS from 'sockjs-client';export default {data() {return {exceptionData: [],exceptionCount: 0,};},mounted() {this.connectToWebSocket();},methods: {connectToWebSocket() {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, (frame) => {// 订阅异常消息stompClient.subscribe('/topic/exceptions', (message) => {this.exceptionData.push(message.body);this.exceptionCount = this.exceptionData.length;});// 发送连接请求,获取未发送的消息stompClient.send("/app/connect");});}}
};

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词