WebSocket
- 1.引入依赖
- 2.创建实体类
- 3.创建接口
- 4.异常消息服务
- 5.WebSocket 控制器
- 6.配置 WebSocket
- 7.客户端 (Vue.js)
实现一个简单的webSocket,实现异常信息的推送,重试等
1.引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.创建实体类
@Data
public class ExceptionMessage {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String message;private LocalDateTime createdAt;// 'PENDING', 'SENT', 'FAILED'private String status;private int retries;private LocalDateTime retryAt;
}
3.创建接口
public interface ExceptionMessageMapper extends JpaRepository<ExceptionMessage, Long> {// 获取所有状态为PENDING的消息List<ExceptionMessage> findByStatus(String status);// 获取重试时间早于当前时间的消息List<ExceptionMessage> findByStatusAndRetryAtBefore(String status, LocalDateTime now);
}
4.异常消息服务
@Service
public class ExceptionService {private static final int MAX_RETRIES = 3;private static final long RETRY_INTERVAL_MS = 5000; // 5秒重试间隔private final ExceptionMessageMapper messageMapper;private final SimpMessagingTemplate messagingTemplate;@Autowiredpublic ExceptionService(ExceptionMessageMapper messageMapper, SimpMessagingTemplate messagingTemplate) {this.messageMapper = messageMapper;this.messagingTemplate = messagingTemplate;}// 存储异常消息到数据库@Transactionalpublic void storeExceptionMessage(String exceptionData) {ExceptionMessage message = new ExceptionMessage();message.setMessage(exceptionData);message.setStatus(StatusEnum.PENDING.getStringVal()); // 初始状态为PENDINGmessage.setRetries(0);message.setCreatedAt(LocalDateTime.now());messageMapper.save(message);}// 向前端发送消息public void sendMessage(String exceptionData) {messagingTemplate.convertAndSend(TopicEnum.EXCEPTIONS.getStringVal(), exceptionData);}// 发送单个消息并更新状态@Transactionalpublic void processMessage(ExceptionMessage message) {try {// 存储消息到数据库storeExceptionMessage(message.getMessage());// 推送消息到前端sendMessage(message.getMessage());// 更新消息状态为SENTmessage.setStatus(StatusEnum.SENT.getStringVal());messageMapper.save(message);} catch (Exception e) {// 如果发送失败,更新为FAILED并设置重试时间message.setStatus(StatusEnum.FAILED.getStringVal());message.setRetryAt(LocalDateTime.now().plus(Duration.ofMillis(RETRY_INTERVAL_MS)));message.setRetries(message.getRetries() + 1);messageMapper.save(message);}}// 重试未成功发送的消息public void retryFailedMessages() {List<ExceptionMessage> failedMessages = messageMapper.findByStatusAndRetryAtBefore(StatusEnum.FAILED.getStringVal(), LocalDateTime.now());for (ExceptionMessage message : failedMessages) {if (message.getRetries() < MAX_RETRIES) {processMessage(message);} else {// 超过最大重试次数,放弃推送message.setStatus(StatusEnum.GIVE_UP.getStringVal());messageMapper.save(message);}}}// 重新发送所有PENDING状态的消息public void resendPendingMessages() {List<ExceptionMessage> pendingMessages = messageMapper.findByStatus(StatusEnum.PENDING.getStringVal());for (ExceptionMessage message : pendingMessages) {processMessage(message);}}
}
5.WebSocket 控制器
处理WebSocket连接和消息的推送。
@RestController
@RequestMapping(value = "/app")
public class WebSocketController {private final org.example.webSocket.service.ExceptionService exceptionService;public WebSocketController(org.example.webSocket.service.ExceptionService exceptionService) {this.exceptionService = exceptionService;}/*** 由前端通过 STOMP 协议 调用的,这是一个处理连接请求的方法。* 每当前端连接到 WebSocket 时,会触发此方法执行,从而执行恢复未发送的消息**/@MessageMapping("/connect")public void handleConnect() {exceptionService.resendPendingMessages();}/*** 每5秒定时重试失败的消息**/@Scheduled(fixedRate = 5000)public void retryFailedMessages() {exceptionService.retryFailedMessages();}
}
6.配置 WebSocket
在Spring Boot中启用WebSocket。
/*** webSocket配置类,注册WebSocket端点*/
@Configuration
@EnableWebSocketMessageBroker
@EnableScheduling
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 启用简单的消息代理,用于处理客户端的订阅(如:/topic/xxx)registry.enableSimpleBroker("/topic");// 设置应用程序的前缀,用于处理来自客户端的消息(如:/app/xxx)registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册 WebSocket 端点,客户端通过此端点连接 WebSocket 服务registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();}
}
7.客户端 (Vue.js)
在Vue前端,你需要在WebSocket连接成功时向后端发送连接请求以获取未发送的消息。
import { Stomp } from '@stomp/stompjs';
import SockJS from 'sockjs-client';export default {data() {return {exceptionData: [],exceptionCount: 0,};},mounted() {this.connectToWebSocket();},methods: {connectToWebSocket() {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, (frame) => {// 订阅异常消息stompClient.subscribe('/topic/exceptions', (message) => {this.exceptionData.push(message.body);this.exceptionCount = this.exceptionData.length;});// 发送连接请求,获取未发送的消息stompClient.send("/app/connect");});}}
};