欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 会展 > Spring WebFlux SSE(服务器发送事件)的正确用法

Spring WebFlux SSE(服务器发送事件)的正确用法

2024/11/30 7:37:05 来源:https://blog.csdn.net/wjw465150/article/details/144060279  浏览:    关键词:Spring WebFlux SSE(服务器发送事件)的正确用法

在SpringBoot2下SSE实现是返回一个SseEmitter,然后通过SseEmitter的send方法来发送事件.

在SpringBoot3的WebFlux 下SSE实现是返回一个Flux<ServerSentEvent<?>>,但是怎么手动向客户端发送SSE事件搜遍全网也没有看到一个讲清楚的.网上的例子一般都是这样的:

  @GetMapping("/stream-sse")public Flux<ServerSentEvent<String>> streamEvents() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> {ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder().id(String.valueOf(sequence)).event("periodic-event").data("SSE - " + LocalTime.now().toString()).build();log.info("stream-sse: " + serverSentEvent);return serverSentEvent;}).doOnCancel(() -> log.warn("stream-sse canceled")).doOnError(e -> log.error("stream-sse error", e));}

经过半天的摸索,终于找到解决方案,原来是通过Sinks.Many<ServerSentEvent<?>>这个类的tryEmitNext方法来手动发送事件!
下面是代码例子:

  // 使用 Sinks.Many<ServerSentEvent<String>> 对应非反应式的SseEmitter@GetMapping("/stream-sse-sink")public Flux<ServerSentEvent<String>> streamSseMvc() {//@wjw_comment: 必须是unicast().onBackpressureError(),否则服务的收不到断开事件// unicast() 提供只能一个订阅者的单播// onBackpressureError() 拒绝第一个订阅者之后的其它订阅者Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureError();Flux<ServerSentEvent<String>> flux = sink.asFlux();Scheduler single = Schedulers.boundedElastic();single.schedule(() -> {try {for (int i = 0; i < 50; i++) {ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder().id(String.valueOf(i)).event("periodic-event").data("SSE - " + LocalTime.now().toString()).build();log.info("stream-sse-sink: " + serverSentEvent);if(sink.tryEmitNext(serverSentEvent).isFailure()) {log.error("sink.tryEmitNext isFailure");break;}Thread.sleep(1000);}} catch (Exception ex) {sink.tryEmitError(ex);} finally {sink.tryEmitComplete();}},3,TimeUnit.SECONDS);return flux;}

最后加上使用WebClient获取SSE的例子:

  @Testvoid testWebClientSSE() {ParameterizedTypeReference<ServerSentEvent<String>> typeSSE      = new ParameterizedTypeReference<ServerSentEvent<String>>() {};CompletableFuture<Boolean>                          future       = new CompletableFuture<>();AtomicInteger                                       order        = new AtomicInteger();AtomicReference<Disposable>                         refSubscribe = new AtomicReference<>();WebClient webClient = WebClient.create();Flux<ServerSentEvent<String>> eventStream = webClient.get().uri("http://localhost:8080/stream-sse-sink").retrieve().bodyToFlux(typeSSE);Disposable subscribe = eventStream.doFinally(single -> {future.complete(true);logger.info("doFinally:" + single);}).subscribe(content -> {if (order.incrementAndGet() > 5 && refSubscribe.get() != null) {refSubscribe.get().dispose();future.complete(true);}logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",LocalTime.now(),content.event(),content.id(),content.data());});refSubscribe.set(subscribe);future.join();}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com