前言
我们组用的是Spring WebFlux
做Web框架, 搭配的Reactor Netty
做的Web容器, 搭配Lettuce
做Redis Client,搭配WebClient
(Netty
) 做的HttpClient。 总体上就是Netty这一套。
于是在使用上遇见了一些问题:
- 因为Mysql现在还不是异步的,因此遇见慢查就 可能 会拖慢整体的响应速度;
- 系统是微服务架构,有时并发高了整体速度会被 拖 慢。
分析一下: 打开一个JVM进程, 发现HTTP线程是这样的:
$ jstack 612 | grep '"' | grep http
"reactor-http-epoll-4" #68 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-3" #60 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-2" #58 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-1" #54 daemon prio=5 os_prio=0 runnable
如上,作为容器的 Netty Server 端用的线程, 和作为 HttpClient 的WebClient
的线程, 他们公用一套(后文分析)。
这样一来, 就四个线程 Max(核心数,4)
, 如果某线程阻塞(慢查), 或者某些并发高了(HttpClient请求), 就可能导致其它使用这些Netty线程的任务没法分配到线程, 亦或优先级低排不上线程。 系统就卡了。
本文 主要 从 Netty
异步线程的角度来分析如何优化这些卡顿。
附:
1.这样改的原理下文阐述;
2.WebClient
的实现很多,比如Jetty
、ReactorNetty
等, 如果你不是使用的Netty
做HttpClient, 可另行探寻方案。
解决思路
线程混用慢之解决
当我们发现Netty Server
/ Web容器 和WebClient
/HttpClient 使用的是一套线程之后, 解决思路就比较简单了:拆线程池
通常的WebClient
写法是:
public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {
WebClient.Builder builder = WebClient.builder();// 调 HttpResources.reset() 方法创建一套新的线程池ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset().wiretap(true));
return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}
只需要让WebClient
/HttpClient 用一套新的线程池就好了:
@Bean("loadBalancedWebClientBuilder")
public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {
WebClient.Builder builder = WebClient.builder();// 调 HttpResources.reset() 方法创建一套新的线程池ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset().wiretap(true));
return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}
这样你的服务的JVM进程里面就会出现两套Netty
的异步线程。
如下(此处http-client的线程名称我手动做了修改, 以便于区分):
$ jstack 1095 | grep '"' | grep http
"reactor-http-client-epoll-4" #114 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-3" #113 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-2" #112 daemon prio=5 os_prio=0 runnable
"reactor-http-client-epoll-1" #111 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-4" #97 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-3" #96 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-2" #89 daemon prio=5 os_prio=0 runnable
"reactor-http-epoll-1" #84 daemon prio=5 os_prio=0 runnable
系统存在阻塞慢之解决
如果某些接口, 它的请求量本身不是很高, 但是请求一次就会导致大量的慢查, 或者其它阻塞的情况。 这些接口会长时间占据 Netty Server
/ Web容器 的线程, 导致其它请求一直在队列里面, 这样即使 Netty Server
/ Web容器 和WebClient
/HttpClient 两者的队列拆分了,系统还是会卡。
解决方案便很简单, 将这些卡顿的接口单独拧出来, 用单独的线程去跑。这样Netty Server
/ Web容器 的线程在Controller直接让渡给单独的线程, 便不存在挤兑的场景了。
首先, 创建一个新的异步队列, 可以使用Lettuce
, 也可以使用WebClient
(我们使用了WebClient
):
/*** 给那些耗时很长的, 会占用大量的DB资源乃至线程时间的任务独立分配一些线程, 让他们不去抢占主线程;(基于reactor, http-server的线程直接让渡给这个线程了, http server空闲)* 同时, 这些线程还去做大量占用httpClient的事(需要找到细化的代码, 独立处理)* @return*/
@Bean("soleThreadHttpClient")
public WebClient.Builder soleThreadHttpClient() {
WebClient.Builder builder = WebClient.builder();
ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create(HttpResources.reset()).wiretap(true));
return builder.clientConnector(diffReactive);
}
其次, 基于Spring做一个Bean。 只做让渡工作:
/*** 独立的线程服务, 让渡出主线程.** @author nobody*/
@Service
public class SoleThreadTaskService {
@Value("${server.port}")private String port;
@Resourceprivate WebClient.Builder soleThreadHttpClient;
public Mono<String> releaseThread() {return soleThreadHttpClient.build().get().uri("http://127.0.0.1:" + port + "/actuator/health").retrieve().bodyToMono(new ParameterizedTypeReference<String>() {}).map(x -> Thread.currentThread().getName()).switchIfEmpty(Mono.just(Thread.currentThread().getName()));}
}
最后, 在可能卡顿的接口前做一个切换:
@Resource
private SoleThreadTaskService soleThreadTaskService;
@PostMapping(path = "/caton/long-time")
public Mono<Result<Object>> caton() {
return soleThreadTaskService.releaseThread().flatMap(x -> somethingCaton()).map(ResultBuilder::ok);
}
这样一套组合下来那些卡顿的接口, 他们的线程一开始就直接让渡给了新的这个sole-http-client
, 而新的这些接口也只做这种会很卡顿的事情, 这样系统中就存在三个Netty
类型的线程, 分别负责Web容器、HttpClient 和 容易卡顿的Web请求的任务。 系统一定程度上会得到优化。
如下(三个线程名称我都手动做了修改, 以便于区分):
$ jstack 2354 | grep '"' | grep http
"reactor-http-client-sole-epoll-4" #137 daemon runnable
"reactor-http-client-sole-epoll-3" #136 daemon runnable
"reactor-http-client-sole-epoll-2" #135 daemon runnable
"reactor-http-client-sole-epoll-1" #134 daemon runnable
"reactor-http-client-epoll-4" #102 daemon runnable
"reactor-http-client-epoll-3" #101 daemon runnable
"reactor-http-client-epoll-2" #100 daemon runnable
"reactor-http-client-epoll-1" #99 daemon runnable
"reactor-http-epoll-4" #98 daemon runnable
"reactor-http-epoll-3" #97 daemon runnable
"reactor-http-epoll-2" #90 daemon runnable
"reactor-http-epoll-1" #85 daemon runnable
附:
1.Netty
容器的SpringWebFlux, 本身线程数通常在一百个左右,高并发下基本上是远少于Tomcat
容器的线程数量的, 因此本着节省资源的原则, 多加四个或者八个线程并不会对性能存在很大的影响;2.读者直接复制代码去尚不能运行的, 如果期望使用,需要修改源码, 可参阅下文。
3.通2,不建议读者去改源码, 毕竟直接使用
Lettuce
也是可以做到直接托管线程的, 它的线程数量更多(小问题是你的接口依赖Redis,你还需要做Redis降级预案)。4.也可以直接
punishOn()
一个新的线程池。
Netty的EventLoop
Netty的线程模型较为复杂, 比如有
EventLoop
、EventLoopGroup
、EventExecutor
、ExecutorService
;Server
、Client
、Channel
、Register
等这些概念, 各种类的继承、线程池等较为复杂。本文主要是讲它的线程模型, 辅助于Netty的线程异步改造。 因此会将这些概念简化。
本表格仅为作者观点。
名词 | 释义 |
---|---|
EventLoop | 简单的理解为一个只有一个线程的线程池 |
EventLoopGroup | 一组EventLoop 的集合, 叫Parent |
LoopResources | Netty暴露给服务Server /Client /Channel 等用的调度口,管理EventLoopGroup |
EventLoop
和EventLoopGroup
在LinuxEpoll和非LinuxNIO有不同的实现:
io.netty.channel.epoll.Epoll#isAvailable
Netty为Linux写了.so
文件,通过Native
方法检测.so
是否被加载来确定是否优先使用Epoll
。
对应的实现类则分别是:
- NioEventLoop 和 NioEventLoopGroup
- EpollEventLoop 和 EpollEventLoopGroup
下文都使用 NIO 讲解。
LoopResources 如何管理 EventLoopGroup
上文我们有看到JVM进程中的Netty HTTP相关的线程, 他们的创建方式如LoopResources
的默认实现:DefaultLoopResources
:
final String prefix;final boolean daemon;final int selectCount; // 异步IO中select线程的数量定义(与操作系统通讯)final int workerCount; // 异步IO中work线程的数量定义(可以用来做业务)final AtomicReference<EventLoopGroup> serverLoops; // S1:非Linux环境下,用这个做NIO Server的 (参照S2)final AtomicReference<EventLoopGroup> clientLoops; // C1:非Linux环境下,用这个做NIO Client的 (参照C2)final AtomicReference<EventLoopGroup> serverSelectLoops; // selectfinal AtomicReference<EventLoopGroup> cacheNativeClientLoops; // C2:在Linux环境下, 用这个做HttpClient/主要(参照C1)final AtomicReference<EventLoopGroup> cacheNativeServerLoops; // S2:在Linux环境下,用这个当TCP Server (参照S1)final AtomicReference<EventLoopGroup> cacheNativeSelectLoops;final AtomicBoolean running;
同时, 它还定义了EventLoopGroup
中的 EventLoop
使用的线程的名称(下文引用):
static ThreadFactory threadFactory(DefaultLoopResources parent, String prefix) {return new EventLoopFactory(parent.daemon,parent.prefix + "-" + prefix,parent);
}
对应着 reactor-http-epoll-1
这个名称则是:
命名 | 来源 |
---|---|
reactor- | reactor.netty.tcp.TcpResources#create |
http- | reactor.netty.http.HttpResources |
epoll | reactor.netty.resources.DefaultLoopEpoll#getName |
当然, 如果是NIO的实现, 则名称
epoll
改成nio
, 或者kqueue
;
线程池的数字则是:reactor.netty.resources.DefaultLoopResources.EventLoopFactory
自增的。
显然,LoopResources
具备创建线程的能力, 看JVM的栈信息可以发现它就是创建给EventLoop
用的, 以使得EventLoop
具备异步的能力。
EventLoop如何实现异步
如上节。 LoopResources
是具备创建线程的能力的, 而它的线程又是怎么变成EventLoop
的线程的呢?
先看类图(NioEventLoop
):
EventLooop
由 EventLooopGroup
创建, 而EventLooopGroup
又由DefaultLoopResources
创建,
创建EventLooopGroup
时带入那个ThreadFactory
(参考上文), 在初始化EventLoop
的时候新建一个线程, 然后这个新线程就会一直运行EventLoop
里的逻辑(一个不停地循环)。
创建阶段如下流程图 NIO
运行阶段部分代码摘录如下(业务可以通过该办法添加任务)
@Override
public void execute(Runnable task) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();// 其它}// 其它
}
如下流程图 NIO
如上两个流程图, EventLoop
便一直跑在新建的Thread里面, 因而具备了异步线程的能力。
参阅NioEventLoop#run()
:
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {// NIO的selector模型}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}
// selector 到事件之后就开始在本线程运行runAllTasks();} catch (Throwable t) {handleLoopException(t);}// 关闭线程时事件return;}
}
NettyServer对EventLoop的应用
如上已知EventLoop
具备异步线程的能力, Server
和Client
只需要使用上这个异步能力即可:
对于作为容器的 Netty Server, 整体逻辑为NIO的 Selector + Channel
一套流程。 同样的, 也是容器启动+开始监听的流程。
Netty的线程复用, Selector的线程和Server的线程是同一套(如果有WebClient,也是同一套), 都是同一个
EventLoopGroup
。
容器启动:
org.springframework.boot.web.embedded.netty.NettyWebServer#start
目的是启动一个Netty
的HttpServer
(NettyWebServer#startHttpServer
), 启动过程包含创建EventLoopGroup
、创建TcpServer
、Http的配置、绑定端口等。
Selector运行:
NoiEventLoop
在创建Netty Server
的时候, 已经绑定了操作系统的IO事件,入口便是SelectKey
(参阅上文):
NioEventLoop#run()-> NioEventLoop#processSelectedKeys()-> NioEventLoop#processSelectedKeysOptimized-> NioEventLoop#processSelectedKey-> AbstractNioByteChannel.NioByteUnsafe#read-> DefaultChannelPipeline#fireChannelRead-> AbstractChannelHandlerContext#invokeChannelRead-> HeadContext#channelRead
最后抵达的类是DefaultChannelPipeline
, 然后它将会被传达到Netty Server
:
AbstractChannelHandlerContext#fireChannelRead-> AbstractChannelHandlerContext#invokeChannelRead-> ServerBootstrap.ServerBootstrapAcceptor#channelRead:255 childGroup.register(child)-> MultithreadEventLoopGroup#register(io.netty.channel.Channel)-> SingleThreadEventLoop#register(io.netty.channel.Channel)-> SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)-> AbstractChannel.AbstractUnsafe#register
- ClientHttpConnector:一个桥接器, Jetty、Netty给它实现, 然后切合到各自的IO功能上去;
- HttpClient: 这是
Netty
的命名, 承担http客户端的职责。
整体上的逻辑就是将当前任务注册到EventLoop
上去, 然后注册SelectKey
并使用Netty
的NIO
或者Epoll
, 整体调用链路如下:
DefaultWebClient.DefaultRequestBodyUriSpec#exchange-> ExchangeFunctions.DefaultExchangeFunction#exchange-> ReactorClientHttpConnector#connect-> HttpClientFinalizer#responseConnection-> TcpClient#connect()-> TcpClientOperator#connect-> HttpClientConnect.HttpTcpClient#connect
在#connect
里, 通过DefaultLoopResources#onClient
获得了EventLoop
。 然后将请求任务封装成了个HttpClientConnect.MonoHttpConnect
, 他的#subscribe
将会把当前任务提交给刚获得的EventLoop
, 以实现异步的效果, 如下调用链路(HttpClientConnect.MonoHttpConnect#subscribe:313
):
TcpClientOperator#connect-> TcpClientConnect#connect-> PooledConnectionProvider#acquire-> PooledConnectionProvider#disposableAcquire-> DefaultPromise#addListener-> DefaultPromise#notifyListeners-> DefaultPromise#safeExecute
在这儿, 通过调用NoiEventLoop#execute
将任务加入进线程队列里面, 实现了Netty Client(WebClient)的异步提交效果。
private static void safeExecute(EventExecutor executor, Runnable task) {try {executor.execute(task);} catch (Throwable t) {rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);}
}
其中这里面的AbstractChannel.AbstractUnsafe#register0
将实现注册Channel
到EventLoop
的Selector
去。
订阅源码可以发现,
Netty
把异步做到了极致, 几乎所有需要异步提交的地方, 全都是通过eventLoop.execute(new Runnable() {})
来实现。
## WebClient对EventLoop的应用
Spring的
WebClient
可以有Jetty、Netty实现, 这儿讲的是Netty的实现。
仅针对Reactive Reactor WebClient
而言, 一般这么配置WebClient
:
@Bean("loadBalancedWebClientBuilder")
public WebClient.Builder loadBalancedWebClientBuilder(LoadBalancerClient client) {WebClient.Builder builder = WebClient.builder();ReactorClientHttpConnector diffReactive = new ReactorClientHttpConnector(HttpClient.create().wiretap(true));
return builder.filter(new GrayLoadBalancerExchangeFilterFunction(client)).clientConnector(diffReactive);
}
它的概念也非常多, 这儿将其简化:
- ExchangeFunction: 用做Client端的数据交换实现;起一个承上启下的作用;
- ClientHttpConnector:一个桥接器, Jetty、Netty给它实现, 然后切合到各自的IO功能上去;
- HttpClient: 这是
Netty
的命名, 承担http客户端的职责。
整体上的逻辑就是将当前任务注册到EventLoop
上去, 然后注册SelectKey
并使用Netty
的NIO
或者Epoll
, 整体调用链路如下:
DefaultWebClient.DefaultRequestBodyUriSpec#exchange-> ExchangeFunctions.DefaultExchangeFunction#exchange-> ReactorClientHttpConnector#connect-> HttpClientFinalizer#responseConnection-> TcpClient#connect()-> TcpClientOperator#connect-> HttpClientConnect.HttpTcpClient#connect
在#connect
里, 通过DefaultLoopResources#onClient
获得了EventLoop
。 然后将请求任务封装成了个HttpClientConnect.MonoHttpConnect
, 他的#subscribe
将会把当前任务提交给刚获得的EventLoop
, 以实现异步的效果, 如下调用链路(HttpClientConnect.MonoHttpConnect#subscribe:313
):
TcpClientOperator#connect-> TcpClientConnect#connect-> PooledConnectionProvider#acquire-> PooledConnectionProvider#disposableAcquire-> DefaultPromise#addListener-> DefaultPromise#notifyListeners-> DefaultPromise#safeExecute
在这儿, 通过调用NoiEventLoop#execute
将任务加入进线程队列里面, 实现了Netty Client(WebClient)的异步提交效果。
private static void safeExecute(EventExecutor executor, Runnable task) {try {executor.execute(task);} catch (Throwable t) {rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);}
}
使用Netty Client的WebClient, 使用
EventLoop
的方法和Netty Server差不多,都是通过调用#execute
方法把当前的这个请求提交到它的队列里面去。
EventLoop线程复用
Server,Client,Selector公用线程
默认情况下, 他们是复用的, 但是可以手动拆(参考后面两节)。
参阅DefaultLoopResources
构造器。 通常用户没有指定selector
的数量, 因此默认为-1, 将其指向serverLoop
:
if (selectCount == -1) {this.selectCount = workerCount;this.serverSelectLoops = this.serverLoops;this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
}
这样做理论上并没有什么大碍, 通常只要后续的Web服务不阻塞, 极少数的线程是完全可以实现极高的并发的, 并不需要专门再分出一批Select线程出来。
况且DefaultLoopResources
也提供了创建新的selector
的构造器。
同理。 因为代码编写习惯, 创建HttpClient的时候, 会用上HttpResources.get()
, 获取到的是一个AtomicReference<HttpResources>
, 它在最初创建Netty Http Server
的时候已经初始化完毕了, 因此会返回同一个 HttpResources
, 其父类元素便是之前创建的DefaultLoopResources
。
在使用DefaultLoopResources#onClient
的时候, 本质上也是对serverLoop
的封装。
综上, 正常情况下, Server, Client, Selector公用一个 EventLoopGroup
, 也就是他们公用线程。
这样其实也是没什么毛病的,毕竟作为异步client,本身也不会太耗费资源。 只要selector不阻塞, 或者server不阻塞, client本身就不会阻塞。
拆除Client的线程
回到本文第二章:线程混用慢之解决, 只需要简单的调用HttpResources#reset()
便可以清除掉AtomicReference<HttpResources>
的内容, 并创建一份新的DefaultLoopResources
, 就实现了使用新的HttpClient
线程池。
至于之前已经创建好的Netty Server
, 它自身给缓存上了最初创建的EventLoop
, 因此与WebClient
的隔离, 参阅如下代码片段TcpServerRunOn
:
static void configure(ServerBootstrap b,boolean preferNative,LoopResources resources) {SslProvider sslProvider = SslProvider.findSslSupport(b);
boolean useNative = preferNative &&(sslProvider == null || !(sslProvider.sslContext instanceof JdkSslContext));// 此处创建serverLoop(EventLoopGroup) EventLoopGroup selectorGroup = resources.onServerSelect(useNative);EventLoopGroup elg = resources.onServer(useNative);// 设定b.group(selectorGroup, elg).channel(resources.onServerChannel(elg));
}
然后将其设定为类ServerBootstrap
元素, 便全局共享了。 :
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;
}
使用的地方参阅上方 NettyServer对EventLoop的应用:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
如果你期望给你的新client线程改名字, 有点麻烦, 因为
HttpResourcess
是public final class
。
用户可以尝试直接把代码复制一份,然后手动修改源码。
新增一份Client的线程
回到本文第二章:系统存在阻塞慢之解决.
如果直接再复制一份Http Client
的Bean, 会发现它并不生效, 因为Netty在复用上做得非常不错, 如下代码片段(HttpClientConnect.HttpTcpClient#connect
):
// 此处取的是最后一次生成的`DefaultLoopResources`
LoopResources loops = HttpResources.get();
EventLoopGroup elg = loops.onClient(useNative);
此处取的是最后一次生成的DefaultLoopResources
, 相当于之前的N次编写都不生效。 如果期望生效, 可以新建一份代码, 然后使用此逻辑(作者编写)
private LoopResources threadHold() {if (defaultClient != null && defaultClient.getClass().getName().contains("TcpClientBootstrap")) {try {Field source = defaultClient.getClass().getSuperclass().getDeclaredField("source");source.setAccessible(true);TcpClient client = (TcpClient) source.get(defaultClient);Field provider = client.getClass().getDeclaredField("provider");provider.setAccessible(true);HttpResources resource = (HttpResources) provider.get(client);// 把对应的LoopResources缓存着。return resource.canNotExtendLoop;} catch (NoSuchFieldException | IllegalAccessException e) {}}return null;
}