欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > RPC 同步与异步之使用Spring WebFlux + WebClient或Netty + Reactor

RPC 同步与异步之使用Spring WebFlux + WebClient或Netty + Reactor

2025/4/1 2:11:31 来源:https://blog.csdn.net/qq_37954693/article/details/146587715  浏览:    关键词:RPC 同步与异步之使用Spring WebFlux + WebClient或Netty + Reactor

1. RPC 同步与异步

  • 同步 RPC:调用方发起请求后,会一直阻塞等待远程方法返回结果。
  • 异步 RPC:调用方发起请求后,不会阻塞等待,而是通过回调、Future、Promise 或者其他方式来获取结果。

2. 响应式编程

  • 响应式编程是一种编程范式,强调 非阻塞、事件驱动、数据流处理
  • 响应式编程一般会用 Reactive Streams(如 Project Reactor、RxJava) 来处理异步数据流。

3. Netty 结合 RPC 的两种设计

如果使用 Netty 作为底层通信,可以设计两种 RPC 模式:

  1. 同步 RPC(基于 Netty,但仍然同步处理)
    • Netty 作为底层传输,但调用方仍然 同步等待 远程调用的返回值。
    • 适用于需要简单封装 Netty 但保持传统 RPC 体验的场景。
  2. 异步响应式 RPC(结合 Netty + 响应式编程)
    • Netty 本身是 事件驱动、异步的,可以与 响应式编程(如 Reactor) 结合,让 RPC 具备 流式、异步、非阻塞 特性。
    • 典型做法:
      • 使用 CompletableFuture、RxJava、Reactor(Mono/Flux) 等返回异步响应。
      • 例如 Dubbo3 支持 Reactive RPC,返回 Mono<T>

4. 选择哪种方式?

  • 如果你的调用方代码还是同步的,那即使 Netty 异步,最终调用也是同步的,不太能发挥 Netty 的异步特性。
  • 如果想提高吞吐量、减少线程阻塞,使用响应式(如 Reactor)可以提升性能,适合 高并发、流式数据处理的 RPC 调用

如果你要调用多个第三方接口,并希望以 响应式(Reactive) 的方式处理,通常可以使用 Reactor(Project Reactor)RxJava 来组合多个异步请求,而底层可以用 Netty + WebClient 或异步 HTTP 客户端 来处理非阻塞调用。


1. 传统方式(阻塞模式)

问题:如果有 3 个第三方接口,传统的 RestTemplate 或同步 HttpClient 需要依次等待它们返回,导致整体耗时长:

java复制编辑String result1 = callApi1();  // 耗时 100ms
String result2 = callApi2();  // 耗时 200ms
String result3 = callApi3();  // 耗时 150ms
// 总耗时 = 100 + 200 + 150 = 450ms

这种同步方式不能充分利用多线程,导致性能低。


2. 响应式方式(非阻塞 + 并发执行)

如果你使用 Spring WebFlux + WebClientNetty + Reactor,你可以让多个 API 并行调用,显著减少总耗时。

(1) 使用 WebClient(Spring WebFlux 方案)

Spring 提供的 WebClient 基于 Netty,支持 完全异步 & 非阻塞 调用:

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;WebClient webClient = WebClient.create();// 并行调用 3 个接口,返回 Mono(单值异步流)
Mono<String> result1 = webClient.get().uri("https://api.example.com/1").retrieve().bodyToMono(String.class);
Mono<String> result2 = webClient.get().uri("https://api.example.com/2").retrieve().bodyToMono(String.class);
Mono<String> result3 = webClient.get().uri("https://api.example.com/3").retrieve().bodyToMono(String.class);// 合并多个 Mono,并行执行,合并结果
Mono<List<String>> combined = Mono.zip(result1, result2, result3).map(tuple -> List.of(tuple.getT1(), tuple.getT2(), tuple.getT3()));// 订阅并获取结果
combined.subscribe(results -> System.out.println("最终结果: " + results));

优点

  • 并行调用,多个 API 同时发出请求,减少等待时间。
  • 非阻塞,不会阻塞线程,提高吞吐量。

WebClient创建默认使用的是reactor.netty.http.client.HttpClient在reactor-netty-http-1.2.2.jar

在这里插入图片描述


(2) 使用 Netty + Reactor 直接发送异步请求

如果你不使用 Spring WebFlux,而是自己基于 Netty + Reactor 实现:

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;HttpClient client = HttpClient.create();Mono<String> result1 = client.get().uri("https://api.example.com/1").responseContent().aggregate().asString();
Mono<String> result2 = client.get().uri("https://api.example.com/2").responseContent().aggregate().asString();
Mono<String> result3 = client.get().uri("https://api.example.com/3").responseContent().aggregate().asString();Mono<List<String>> combined = Mono.zip(result1, result2, result3).map(tuple -> List.of(tuple.getT1(), tuple.getT2(), tuple.getT3()));combined.subscribe(results -> System.out.println("最终结果: " + results));

适用于高性能 Netty 服务器,完全异步 & 事件驱动


3. 处理多个响应的场景

在实际项目中,你可能需要:

  1. 等待所有 API 完成Mono.zip()
  2. 谁先返回就先处理Flux.merge()
  3. 某个 API 超时就降级.timeout(Duration.ofSeconds(3))
  4. 错误重试.retry(3)

示例:

import org.springframework.web.reactive.function.client.WebClient;WebClient webClient = WebClient.create();Mono<String> result1 = webClient.get().uri("https://api.example.com/1").retrieve().bodyToMono(String.class).timeout(Duration.ofSeconds(3))  // 超时.retry(3)  // 失败时重试 3 次.onErrorReturn("默认值1")  // 失败降级.doOnNext(res -> System.out.println("result1 成功: " + res)).doOnError(e -> System.err.println("result1 失败: " + e.getMessage()));Mono<String> result2 = webClient.get().uri("https://api.example.com/2").retrieve().bodyToMono(String.class).onErrorResume(e -> Mono.just("备用接口返回值"))  // 降级方案.doOnNext(res -> System.out.println("result2 成功: " + res)).doOnError(e -> System.err.println("result2 失败: " + e.getMessage()));Mono.zip(result1, result2).doOnSuccess(results -> System.out.println("合并结果: " + results)).doOnError(e -> System.err.println("合并失败: " + e.getMessage())).block(); // **同步阻塞,确保 main 线程不会提前结束**

总结

方案实现方式适用场景
同步调用(阻塞)RestTemplate简单但性能低
异步 + 并行调用WebClient推荐,高性能、适合 WebFlux
Netty + ReactorHttpClient(Netty)适用于高并发后端系统

如果你的项目本身是基于 Spring WebFlux,建议用 WebClient;如果是 Netty 服务器,可以直接用 Netty + Reactor。这样可以同时调用多个 API,提高吞吐量,并避免阻塞线程。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词