欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > Spring WebFlux + Netty 的服务卡顿处理

Spring WebFlux + Netty 的服务卡顿处理

2025/3/14 16:35:50 来源:https://blog.csdn.net/qyp199312/article/details/146063490  浏览:    关键词:Spring WebFlux + Netty 的服务卡顿处理

前言

我们组用的是Spring WebFluxWeb框架, 搭配的Reactor Netty做的Web容器, 搭配LettuceRedis Client,搭配WebClient(Netty) 做的HttpClient总体上就是Netty这一套

于是在使用上遇见了一些问题:

  • 因为Mysql现在还不是异步的,因此遇见慢查就 可能 会拖慢整体的响应速度;
  • 系统是微服务架构,有时并发高了整体速度会被 慢。


分析一下: 打开一个JVM进程, 发现HTTP线程是这样的:

$ jstack 612 | grep '"' | grep http
"reactor-http-epoll-4" #68 daemon prio=5 os_prio=0  runnable 
"reactor-http-epoll-3" #60 daemon prio=5 os_prio=0  runnable 
"reactor-http-epoll-2" #58 daemon prio=5 os_prio=0  runnable
"reactor-http-epoll-1" #54 daemon prio=5 os_prio=0  runnable 

如上,作为容器的 Netty Server 端用的线程, 和作为 HttpClientWebClient的线程, 他们公用一套(后文分析)。

这样一来, 就四个线程 Max(核心数,4), 如果某线程阻塞(慢查), 或者某些并发高了(HttpClient请求), 就可能导致其它使用这些Netty线程的任务没法分配到线程, 亦或优先级低排不上线程。 系统就卡了。

本文 主要Netty 异步线程的角度来分析如何优化这些卡顿。

附:
1.这样改的原理下文阐述;
2.WebClient的实现很多,比如JettyReactorNetty等, 如果你不是使用的Netty做HttpClient, 可另行探寻方案。


解决思路

线程混用慢之解决

当我们发现Netty Server / Web容器WebClient/HttpClient 使用的是一套线程之后, 解决思路就比较简单了:拆线程池

通常的WebClient写法是:

public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {WebClient.Builder builder = WebClient.builder();// 调 HttpResources.reset() 方法创建一套新的线程池ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset().wiretap(true));return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}

只需要让WebClient/HttpClient 用一套新的线程池就好了:

@Bean("loadBalancedWebClientBuilder")
public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {WebClient.Builder builder = WebClient.builder();// 调 HttpResources.reset() 方法创建一套新的线程池ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset().wiretap(true));return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}

这样你的服务的JVM进程里面就会出现两套Netty的异步线程。
如下(此处http-client的线程名称我手动做了修改, 以便于区分):

$ jstack 1095 | grep '"' | grep http
"reactor-http-client-epoll-4" #114 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-3" #113 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-2" #112 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-1" #111 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-4" #97 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-3" #96 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-2" #89 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-1" #84 daemon prio=5 os_prio=0 runnable

系统存在阻塞慢之解决


如果某些接口, 它的请求量本身不是很高, 但是请求一次就会导致大量的慢查, 或者其它阻塞的情况。 这些接口会长时间占据 Netty Server / Web容器 的线程, 导致其它请求一直在队列里面, 这样即使 Netty Server / Web容器WebClient/HttpClient 两者的队列拆分了,系统还是会卡。

解决方案便很简单, 将这些卡顿的接口单独拧出来, 用单独的线程去跑。这样Netty Server / Web容器 的线程在Controller直接让渡给单独的线程, 便不存在挤兑的场景了。
首先, 创建一个新的异步队列, 可以使用Lettuce, 也可以使用WebClient(我们使用了WebClient):

/*** 给那些耗时很长的, 会占用大量的DB资源乃至线程时间的任务独立分配一些线程, 让他们不去抢占主线程;(基于reactor, http-server的线程直接让渡给这个线程了, http server空闲)* 同时, 这些线程还去做大量占用httpClient的事(需要找到细化的代码, 独立处理)* @return*/
@Bean("soleThreadHttpClient")
public WebClient.Builder soleThreadHttpClient() {WebClient.Builder builder = WebClient.builder();ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset()).wiretap(true));return builder.clientConnector(diffReactive);
}

其次, 基于Spring做一个Bean。 只做让渡工作

/*** 独立的线程服务, 让渡出主线程.** @author nobody*/
@Service
public class SoleThreadTaskService {@Value("${server.port}")private String port;@Resourceprivate WebClient.Builder soleThreadHttpClient;public Mono<String> releaseThread() {return soleThreadHttpClient.build().get().uri("http://127.0.0.1:" + port + "/actuator/health").retrieve().bodyToMono(new ParameterizedTypeReference<String>() {}).map(x -> Thread.currentThread().getName()).switchIfEmpty(Mono.just(Thread.currentThread().getName()));}}


最后, 在可能卡顿的接口前做一个切换

@Resource
private SoleThreadTaskService soleThreadTaskService;@PostMapping(path = "/caton/long-time")
public Mono<Result<Object>> caton() {return soleThreadTaskService.releaseThread().flatMap(x -> somethingCaton()).map(ResultBuilder::ok);
}


这样一套组合下来那些卡顿的接口, 他们的线程一开始就直接让渡给了新的这个sole-http-client, 而新的这些接口也只做这种会很卡顿的事情, 这样系统中就存在三个Netty类型的线程, 分别负责Web容器HttpClient容易卡顿的Web请求的任务。 系统一定程度上会得到优化。
如下(三个线程名称我都手动做了修改, 以便于区分):

$ jstack 2354 | grep '"' | grep http
"reactor-http-client-sole-epoll-4" #137 daemon runnable
"reactor-http-client-sole-epoll-3" #136 daemon runnable
"reactor-http-client-sole-epoll-2" #135 daemon runnable
"reactor-http-client-sole-epoll-1" #134 daemon runnable
"reactor-http-client-epoll-4" #102 daemon runnable
"reactor-http-client-epoll-3" #101 daemon runnable
"reactor-http-client-epoll-2" #100 daemon runnable
"reactor-http-client-epoll-1" #99 daemon  runnable
"reactor-http-epoll-4" #98 daemon runnable
"reactor-http-epoll-3" #97 daemon runnable
"reactor-http-epoll-2" #90 daemon runnable
"reactor-http-epoll-1" #85 daemon runnable

附:
1.Netty容器的SpringWebFlux, 本身线程数通常在一百个左右,高并发下基本上是远少于Tomcat容器的线程数量的, 因此本着节省资源的原则, 多加四个或者八个线程并不会对性能存在很大的影响;

2.读者直接复制代码去尚不能运行的, 如果期望使用,需要修改源码, 可参阅下文。

3.通2,不建议读者去改源码, 毕竟直接使用Lettuce也是可以做到直接托管线程的, 它的线程数量更多(小问题是你的接口依赖Redis,你还需要做Redis降级预案)。

4.也可以直接punishOn()一个新的线程池。

Netty的EventLoop

Netty的线程模型较为复杂, 比如有EventLoopEventLoopGroupEventExecutorExecutorServiceServerClientChannelRegister等这些概念, 各种类的继承、线程池等较为复杂。

本文主要是讲它的线程模型, 辅助于Netty的线程异步改造。 因此会将这些概念简化。


本表格仅为作者观点

名词释义
EventLoop简单的理解为一个只有一个线程的线程池
EventLoopGroup一组EventLoop的集合, 叫Parent
LoopResourcesNetty暴露给服务Server/Client/Channel等用的调度口,管理EventLoopGroup


EventLoopEventLoopGroup在LinuxEpoll和非LinuxNIO有不同的实现:

io.netty.channel.epoll.Epoll#isAvailable

Netty为Linux写了.so文件,通过Native方法检测.so是否被加载来确定是否优先使用Epoll
对应的实现类则分别是:

  • NioEventLoop 和 NioEventLoopGroup
  • EpollEventLoop 和 EpollEventLoopGroup

下文都使用 NIO 讲解

LoopResources 如何管理 EventLoopGroup

上文我们有看到JVM进程中的Netty HTTP相关的线程, 他们的创建方式如LoopResources 的默认实现:DefaultLoopResources

        final String                          prefix;final boolean                         daemon;final int                             selectCount;      // 异步IO中select线程的数量定义(与操作系统通讯)final int                             workerCount;      // 异步IO中work线程的数量定义(可以用来做业务)final AtomicReference<EventLoopGroup> serverLoops;       // S1:非Linux环境下,用这个做NIO Server的 (参照S2)final AtomicReference<EventLoopGroup> clientLoops;       // C1:非Linux环境下,用这个做NIO Client的 (参照C2)final AtomicReference<EventLoopGroup> serverSelectLoops; // selectfinal AtomicReference<EventLoopGroup> cacheNativeClientLoops;  // C2:在Linux环境下, 用这个做HttpClient/主要(参照C1)final AtomicReference<EventLoopGroup> cacheNativeServerLoops;  // S2:在Linux环境下,用这个当TCP Server (参照S1)final AtomicReference<EventLoopGroup> cacheNativeSelectLoops;final AtomicBoolean                   running;

同时, 它还定义了EventLoopGroup 中的 EventLoop使用的线程的名称(下文引用):

static ThreadFactory threadFactory(DefaultLoopResources parent, String prefix) {return new EventLoopFactory(parent.daemon,parent.prefix + "-" + prefix,parent);
}

对应着 reactor-http-epoll-1 这个名称则是:

命名来源
reactor-reactor.netty.tcp.TcpResources#create
http-reactor.netty.http.HttpResources
epollreactor.netty.resources.DefaultLoopEpoll#getName

当然, 如果是NIO的实现, 则名称epoll 改成 nio, 或者 kqueue;
线程池的数字则是:reactor.netty.resources.DefaultLoopResources.EventLoopFactory自增的。

显然, LoopResources具备创建线程的能力, 看JVM的栈信息可以发现它就是创建给EventLoop用的, 以使得EventLoop具备异步的能力。

EventLoop如何实现异步


如上节。 LoopResources 是具备创建线程的能力的, 而它的线程又是怎么变成EventLoop的线程的呢?

先看类图(NioEventLoop):
在这里插入图片描述

EventLooopEventLooopGroup创建, 而EventLooopGroup 又由DefaultLoopResources创建,
创建EventLooopGroup时带入那个ThreadFactory参考上文), 在初始化EventLoop的时候新建一个线程, 然后这个新线程就会一直运行EventLoop里的逻辑(一个不停地循环)。

创建阶段如下流程图 NIO

Created with Raphaël 2.3.0 DefaultLoopResources新建EventLooopGroup 传入了ThreadFactory 并 封装成ThreadPerTaskExecutor NioEventLoopGroup#newChild 新建 NioEventLoop 实例化 NioEventLoop 传参传入 ThreadPerTaskExecutor 实例化其父类 SingleThreadEventExecutor,赋值 executor: ThreadPerTaskExecutorr 得到实例化了Child的EventLooopGroup EventLooopGroup构建完毕

运行阶段部分代码摘录如下(业务可以通过该办法添加任务)

@Override
public void execute(Runnable task) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();// 其它}// 其它
}


如下流程图 NIO

Created with Raphaël 2.3.0 SingleThreadEventExecutor#execute SingleThreadEventExecutor#startThread SingleThreadEventExecutor#doStartThread 利用上文的executor新建一个线程 EventLoop在这个循环的线程里面一直接收并处理任务


如上两个流程图, EventLoop便一直跑在新建的Thread里面, 因而具备了异步线程的能力。
参阅NioEventLoop#run():

protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {// NIO的selector模型}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}// selector 到事件之后就开始在本线程运行runAllTasks();} catch (Throwable t) {handleLoopException(t);}// 关闭线程时事件return;}
}


NettyServer对EventLoop的应用

如上已知EventLoop具备异步线程的能力, ServerClient只需要使用上这个异步能力即可:

对于作为容器的 Netty Server, 整体逻辑为NIO的 Selector + Channel一套流程。 同样的, 也是容器启动+开始监听的流程。

Netty的线程复用, Selector的线程和Server的线程是同一套(如果有WebClient,也是同一套), 都是同一个EventLoopGroup

容器启动

org.springframework.boot.web.embedded.netty.NettyWebServer#start

目的是启动一个NettyHttpServerNettyWebServer#startHttpServer), 启动过程包含创建EventLoopGroup、创建TcpServer、Http的配置、绑定端口等。

Selector运行
NoiEventLoop在创建Netty Server的时候, 已经绑定了操作系统的IO事件,入口便是SelectKey(参阅上文):

NioEventLoop#run()-> NioEventLoop#processSelectedKeys()-> NioEventLoop#processSelectedKeysOptimized-> NioEventLoop#processSelectedKey-> AbstractNioByteChannel.NioByteUnsafe#read-> DefaultChannelPipeline#fireChannelRead-> AbstractChannelHandlerContext#invokeChannelRead-> HeadContext#channelRead

最后抵达的类是DefaultChannelPipeline, 然后它将会被传达到Netty Server

AbstractChannelHandlerContext#fireChannelRead-> AbstractChannelHandlerContext#invokeChannelRead-> ServerBootstrap.ServerBootstrapAcceptor#channelRead:255 childGroup.register(child)-> MultithreadEventLoopGroup#register(io.netty.channel.Channel)-> SingleThreadEventLoop#register(io.netty.channel.Channel)-> SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)-> AbstractChannel.AbstractUnsafe#register
  • ClientHttpConnector:一个桥接器, Jetty、Netty给它实现, 然后切合到各自的IO功能上去;
  • HttpClient: 这是Netty的命名, 承担http客户端的职责。

    整体上的逻辑就是将当前任务注册到EventLoop上去, 然后注册SelectKey并使用NettyNIO或者Epoll, 整体调用链路如下:
DefaultWebClient.DefaultRequestBodyUriSpec#exchange-> ExchangeFunctions.DefaultExchangeFunction#exchange-> ReactorClientHttpConnector#connect-> HttpClientFinalizer#responseConnection-> TcpClient#connect()-> TcpClientOperator#connect-> HttpClientConnect.HttpTcpClient#connect

#connect里, 通过DefaultLoopResources#onClient获得了EventLoop。 然后将请求任务封装成了个HttpClientConnect.MonoHttpConnect, 他的#subscribe将会把当前任务提交给刚获得的EventLoop, 以实现异步的效果, 如下调用链路(HttpClientConnect.MonoHttpConnect#subscribe:313):

TcpClientOperator#connect-> TcpClientConnect#connect-> PooledConnectionProvider#acquire-> PooledConnectionProvider#disposableAcquire-> DefaultPromise#addListener-> DefaultPromise#notifyListeners-> DefaultPromise#safeExecute

在这儿, 通过调用NoiEventLoop#execute将任务加入进线程队列里面, 实现了Netty Client(WebClient)的异步提交效果。

private static void safeExecute(EventExecutor executor, Runnable task) {try {executor.execute(task);} catch (Throwable t) {rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);}
}

其中这里面的AbstractChannel.AbstractUnsafe#register0 将实现注册ChannelEventLoopSelector去。

订阅源码可以发现, Netty把异步做到了极致, 几乎所有需要异步提交的地方, 全都是通过eventLoop.execute(new Runnable() {}) 来实现。

​## WebClient对EventLoop的应用

Spring的WebClient可以有Jetty、Netty实现, 这儿讲的是Netty的实现。
仅针对Reactive Reactor WebClient而言, 一般这么配置WebClient

@Bean("loadBalancedWebClientBuilder")
public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {WebClient.Builder builder = WebClient.builder();ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create().wiretap(true));return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}

它的概念也非常多, 这儿将其简化:

  • ExchangeFunction: 用做Client端的数据交换实现;起一个承上启下的作用;
  • ClientHttpConnector:一个桥接器, Jetty、Netty给它实现, 然后切合到各自的IO功能上去;
  • HttpClient: 这是Netty的命名, 承担http客户端的职责。

    整体上的逻辑就是将当前任务注册到EventLoop上去, 然后注册SelectKey并使用NettyNIO或者Epoll, 整体调用链路如下:
DefaultWebClient.DefaultRequestBodyUriSpec#exchange-> ExchangeFunctions.DefaultExchangeFunction#exchange-> ReactorClientHttpConnector#connect-> HttpClientFinalizer#responseConnection-> TcpClient#connect()-> TcpClientOperator#connect-> HttpClientConnect.HttpTcpClient#connect

#connect里, 通过DefaultLoopResources#onClient获得了EventLoop。 然后将请求任务封装成了个HttpClientConnect.MonoHttpConnect, 他的#subscribe将会把当前任务提交给刚获得的EventLoop, 以实现异步的效果, 如下调用链路(HttpClientConnect.MonoHttpConnect#subscribe:313):

TcpClientOperator#connect-> TcpClientConnect#connect-> PooledConnectionProvider#acquire-> PooledConnectionProvider#disposableAcquire-> DefaultPromise#addListener-> DefaultPromise#notifyListeners-> DefaultPromise#safeExecute

在这儿, 通过调用NoiEventLoop#execute将任务加入进线程队列里面, 实现了Netty Client(WebClient)的异步提交效果。

private static void safeExecute(EventExecutor executor, Runnable task) {try {executor.execute(task);} catch (Throwable t) {rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);}
}

使用Netty Client的WebClient, 使用EventLoop的方法和Netty Server差不多,都是通过调用 #execute 方法把当前的这个请求提交到它的队列里面去。

EventLoop线程复用

Server,Client,Selector公用线程

默认情况下, 他们是复用的, 但是可以手动拆(参考后面两节)。

参阅DefaultLoopResources构造器。 通常用户没有指定selector的数量, 因此默认为-1, 将其指向serverLoop:

if (selectCount == -1) {this.selectCount = workerCount;this.serverSelectLoops = this.serverLoops;this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
}

这样做理论上并没有什么大碍, 通常只要后续的Web服务不阻塞, 极少数的线程是完全可以实现极高的并发的, 并不需要专门再分出一批Select线程出来。
况且DefaultLoopResources也提供了创建新的selector的构造器

同理。 因为代码编写习惯, 创建HttpClient的时候, 会用上HttpResources.get(), 获取到的是一个AtomicReference<HttpResources>, 它在最初创建Netty Http Server的时候已经初始化完毕了, 因此会返回同一个 HttpResources, 其父类元素便是之前创建的DefaultLoopResources

在使用DefaultLoopResources#onClient的时候, 本质上也是对serverLoop的封装。

综上, 正常情况下, Server, Client, Selector公用一个 EventLoopGroup, 也就是他们公用线程。

这样其实也是没什么毛病的,毕竟作为异步client,本身也不会太耗费资源。 只要selector不阻塞, 或者server不阻塞, client本身就不会阻塞。

拆除Client的线程


回到本文第二章:线程混用慢之解决, 只需要简单的调用HttpResources#reset() 便可以清除掉AtomicReference<HttpResources>的内容, 并创建一份新的DefaultLoopResources, 就实现了使用新的HttpClient线程池。

至于之前已经创建好的Netty Server, 它自身给缓存上了最初创建的EventLoop, 因此与WebClient的隔离, 参阅如下代码片段TcpServerRunOn

static void configure(ServerBootstrap b,boolean preferNative,LoopResources resources) {SslProvider sslProvider =  SslProvider.findSslSupport(b);boolean useNative = preferNative &&(sslProvider == null || !(sslProvider.sslContext instanceof JdkSslContext));// 此处创建serverLoop(EventLoopGroup)               EventLoopGroup selectorGroup = resources.onServerSelect(useNative);EventLoopGroup elg = resources.onServer(useNative);// 设定b.group(selectorGroup, elg).channel(resources.onServerChannel(elg));
}

然后将其设定为类ServerBootstrap元素, 便全局共享了。 :

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;
}

使用的地方参阅上方 NettyServer对EventLoop的应用:

io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

如果你期望给你的新client线程改名字, 有点麻烦, 因为HttpResourcesspublic final class
用户可以尝试直接把代码复制一份,然后手动修改源码。

新增一份Client的线程

回到本文第二章:系统存在阻塞慢之解决.

如果直接再复制一份Http Client的Bean, 会发现它并不生效, 因为Netty在复用上做得非常不错, 如下代码片段(HttpClientConnect.HttpTcpClient#connect):

// 此处取的是最后一次生成的`DefaultLoopResources`
LoopResources loops = HttpResources.get();EventLoopGroup elg = loops.onClient(useNative);

此处取的是最后一次生成的DefaultLoopResources, 相当于之前的N次编写都不生效。 如果期望生效, 可以新建一份代码, 然后使用此逻辑(作者编写

private LoopResources threadHold() {if (defaultClient != null && defaultClient.getClass().getName().contains("TcpClientBootstrap")) {try {Field source = defaultClient.getClass().getSuperclass().getDeclaredField("source");source.setAccessible(true);TcpClient client = (TcpClient) source.get(defaultClient);Field provider = client.getClass().getDeclaredField("provider");provider.setAccessible(true);HttpResources resource = (HttpResources) provider.get(client);// 把对应的LoopResources缓存着。return resource.canNotExtendLoop;} catch (NoSuchFieldException | IllegalAccessException e) {}}return null;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词