欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 金融 > SpringBoot + SSE + rabbitMQ 实现服务端分布式广播推送

SpringBoot + SSE + rabbitMQ 实现服务端分布式广播推送

2025/3/29 16:45:45 来源:https://blog.csdn.net/sanhewuyang/article/details/146496764  浏览:    关键词:SpringBoot + SSE + rabbitMQ 实现服务端分布式广播推送

写在前面


  • 项目中用到,简单整理,为什么选择这种方式,主要是考虑 SSE 简单轻量
  • 和 HTTP 耦合度高,不需要单独考虑限流,加密、认证鉴权等,方便维护
  • 博文内容为通过 SSE + MQ 实现分布式广播推送
  • 当然考虑成本问题,这里的 MQ 也可以使用 redis 发布订阅模式
  • 理解不足小伙伴帮忙指正 😃,生活加油

我看远山,远山悲悯

持续分享技术干货,感兴趣小伙伴可以关注下 _


实现逻辑简单说明,分布式系统,当前项目有一个局部刷新的业务场景,后端处理完数据需要实时推送到前端,之前的处理办法是 WebSocket + redis ,但是 WebSocket 老断,后面考虑做一些报文加密之类的,考虑 WebSocket协议升级全双工通信之后Servlet过滤器之类不再适用,而且也没有客户端推送的需求,所以考虑使用 SSEMQ 的方式,可以基于当前Web安全框架,不需要额外编码

实现方式也比较简单,后端业务处理完数据,通过 MQ 广播交换机发布广播,分布式节点收到订阅后,通过之前建立的 SSE 长连接推送刷新数据ID,前端根据事件进行更新渲染操作

SSE 部分

SSE 由三个方法分别负责建立SSE连接发送房间状态更新维持心跳连接,每个方法都结合了SseEmitter的功能和Spring的异步处理机制,确保实时数据推送的可靠性和效率。

streamStockPrice 建立SSE连接:该方法通过@GetMapping注解定义了一个SSE端点/stream/{hotelId},用于为指定酒店建立实时数据推送连接。核心逻辑包括:

  • 创建SseEmitter:为每个酒店生成唯一标识sId(基于SnowFlake算法),并创建SseEmitter实例,设置超时时间为最大值以保持长连接。
  • 事件处理:通过onCompletiononTimeoutonError回调处理连接关闭、超时和错误场景,自动从sseEmitterMap中移除失效连接。
/*** 建立 SSE 连接* @param hotelId* @return*/@SneakyThrows@GetMapping (value ="/stream/{hotelId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamStockPrice(@HotelValidator @PathVariable("hotelId") Long hotelId) {Future<SseEmitter> future1 = executor.submit(() -> {final String sId = SnowFlake.No (hotelId+"SSE");SseEmitter emitter = new SseEmitter(0L); // 设置超时时间为最大值try {// 发送初始连接事件emitter.send(SseEmitter.event().reconnectTime(1000).name("connectionEstablished_"+ hotelId ).data("连接已建立完成"));emitter.onCompletion(() -> {sseEmitterMap.remove(sId);log.info("========================= SSE连接完成 ===================================:"+sId);});emitter.onTimeout(() -> {log.warn("========================= SSE连接超时 ====================================:"+sId);sseEmitterMap.remove(sId);});emitter.onError ((e) ->{log.warn("========================= SSE连接错误 ====================================:"+sId);sseEmitterMap.remove(sId);//e.printStackTrace ();});log.info("=========================新建连接 ===================================:"+hotelId);sseEmitterMap.put (sId,emitter);return emitter;} catch (  IOException e) {sseEmitterMap.remove(sId);emitter.completeWithError(e);}return emitter;});return  future1.get (5, TimeUnit.SECONDS);}

sendRoomStatus 推送房间状态更新 该方法接收房间状态数据,向指定酒店的所有SSE连接推送更新。实现细节包括:

  • 事件定制:通过SseEmitter.event().name("roomStatus_...")定义自定义事件类型,携带房间状态数据(如roomStatus Map)。
  • 精准推送:遍历sseEmitterMap,仅向与传入hotelId匹配的连接发送数据,避免跨酒店干扰。
 /*** 床房态更新* @param roomStatus*/public void sendRoomStatus(Map roomStatus){executor.submit (() -> {sseEmitterMap.forEach ((key,emitter) ->{final long hotelId =Long.parseLong (roomStatus.get ("hotelId").toString ());final long SSEhotelId = Long.parseLong (key.split ("SSE")[0]);// 只发送给当前的酒店if (hotelId == SSEhotelId){try {emitter.send(SseEmitter.event().name("roomStatus_"+ key.split ("SSE")[0]).reconnectTime(1000).data(roomStatus));log.debug("sendRoomStatus sent to emitter: {}", emitter);} catch (IOException | IllegalStateException e) {  // 扩展异常类型log.error("Emitter failed [ID:{}] - {}", emitter.hashCode(), e.getMessage());}}});});}

sendHeartbeat 维持长连接活跃: 该方法通过@Scheduled注解每5秒执行一次,用于发送心跳事件以保持SSE连接活跃:

  • 心跳事件:使用SseEmitter.event().comment("heartbeat")发送注释型心跳数据,包含当前时间和连接标识sId,防止浏览器因无数据传输而关闭连接。
  • 连接监控:日志输出当前活跃连接数,便于监控系统状态。
/*** 保持长连接的心跳*/@Scheduled (fixedRate = 10000)public void sendHeartbeat () {executor.submit (() -> {try {final int size = sseEmitterMap.size ();log.info ("======================================== 当前连接数:" + size);sseEmitterMap.forEach ((key, emitter) -> {try {emitter.send (SseEmitter.event ().comment ("heartbeat").reconnectTime (1000).data (ImmutableMap.of ("sId", key, "time", System.currentTimeMillis ())));if (log.isDebugEnabled ()) {  // 日志级别优化log.debug ("Heartbeat sent to emitter: {}", emitter);}} catch (IOException | IllegalStateException e) {  // 扩展异常类型log.error ("Emitter failed [ID:{}] - {}", emitter.hashCode (), e.getMessage ());}});} catch (Exception e) {log.error ("+++++++++++++++++++++++++++++++++++++++++++++++++ee");}});}

涉及到的容器一个是线程池,一个是线程安全的 Map

@Slf4j
@RestController
@RequestMapping("/demo/webs/sse")
public class SSEController {private final ExecutorService executor = Executors.newCachedThreadPool();/*** messageId的 SseEmitter对象映射集*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** 建立 SSE 连接* @param hotelId* @return*/@SneakyThrows@GetMapping (value ="/stream/{hotelId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamStockPrice(@HotelValidator @PathVariable("hotelId") Long hotelId) {......................................}/*** 床房态更新* @param roomStatus*/public void sendRoomStatus(Map roomStatus){.....................................}/*** 保持长连接的心跳*/@Scheduled (fixedRate = 5000)public void sendHeartbeat () {..............................}
}

MQ 部分

基于Spring BootRabbitMQ配置类,通过@Bean注解创建了广播交换机(FanoutExchange)​和持久化队列(Queue)​,并建立两者的无路由键绑定关系

@Configuration
public class RabbitMQConfig {public static final String ROOM_STATUS_EXCHANGE = "room_status_exchange"; // 普通交换机名称public static final String ROOM_STATUS_QUEUE = "room_status_queue"; // 普通队列名称// 定义广播交换机(与原有队列参数解耦)@Beanpublic FanoutExchange roomStatusExchange() {return new FanoutExchange(ROOM_STATUS_EXCHANGE, true, false);  // 持久化交换机[3](@ref)}// 定义业务队列@Beanpublic Queue roomStatusQueue() {return new Queue(ROOM_STATUS_QUEUE, true);  // 短信队列[3](@ref)}// 队列与交换机的绑定@Beanpublic Binding roomStatusBinding(FanoutExchange roomStatusExchange, Queue roomStatusQueue) {return BindingBuilder.bind(roomStatusQueue).to(roomStatusExchange);  // 无路由键绑定[1,3](@ref)}
}
​广播消息发送

sendBroadcast方法接收房态数据Map,将其序列化为JSON格式并封装为RabbitMQ消息,通过ROOM_STATUS_EXCHANGE广播交换机(Fanout模式)发送,路由键留空确保所有绑定该交换机的队列均能接收消息。消息头设置x-retry-count=0表示禁用自动重试机制,由业务层自行处理异常重试逻辑

/***  房态广播* @param msg**/public void sendBroadcast(Map<String, Object> msg) {log.info("广播时间:{},内容:{}", LocalDateTime.now(), msg);MessageProperties props = new MessageProperties();props.setHeader("x-retry-count", 0);Message message = new Message(new Gson().toJson(msg).getBytes(), props);rabbitTemplate.convertAndSend(ROOM_STATUS_EXCHANGE, "", message);  // routingKey留空}
消息消费处理

handleRoomStatus 方法通过 @RabbitListener 监听ROOM_STATUS_QUEUE队列,接收房态变更消息后,将消息体反序列化为Map对象,并调用sseController.sendRoomStatus()方法将房态数据推送给前端SSE长连接。处理完成后通过basicAck手动确认消息消费成功,避免消息重复投递。

/*** 房态广播* @param message* @param channel* @throws IOException*/@RabbitListener(queues = RabbitMQConstants.ROOM_STATUS_QUEUE,ackMode = "MANUAL")public void handleRoomStatus(Message message, Channel channel) throws IOException {try {String msg = new String(message.getBody());ObjectMapper objectMapper = new ObjectMapper();Map map = objectMapper.readValue(msg, Map.class);sseController.sendRoomStatus (map);} catch (Exception e) {log.error("消息处理异常:", e);}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动确认[4](@ref)}

业务部分

添加埋点,在业务数据处理完成之后,推送数据ID 到MQ

 ................................final ImmutableMap<String, ? extends Serializable> stringImmutableMap = ImmutableMap.of ("hotelId", hotelId, "action", BusinessTypeConstants.ORDER_CHANGE_LOG_LIVE, "roomCodeNew", amsHotelRoom.getParentCode ());notifySendOnBlockingTaskAsync (stringImmutableMap);...................................       private static final ExecutorService executor = Executors.newFixedThreadPool(5, r -> new Thread(r, "Notify-Thread"));// 埋点方法 public  void notifySendOnBlockingTaskAsync (Map map) {executor.submit (() -> {// 房态广播delayedMessageService.sendBroadcast (map);//  todo 发送短信});}

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 😃



© 2018-至今 liruilonger@gmail.com, 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词