欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Netty源码解析之线程池的实现(一):EventLoopGroup和EventLoop

Netty源码解析之线程池的实现(一):EventLoopGroup和EventLoop

2025/2/6 5:50:10 来源:https://blog.csdn.net/weixin_43274704/article/details/145456225  浏览:    关键词:Netty源码解析之线程池的实现(一):EventLoopGroup和EventLoop

EventLoopGroup和EventLoop介绍

Netty中的线程池实现方式和JDK完成不同。Netty中的线程池准确的说可以叫做执行器组,即EventLoopGroup,执行器组EventLoopGroup里面包含若干个执行器EventLoop。
这种包含关系,以我们惯常的编码方式来实现的话,肯定是在EventLoopGroup中使用一个集合属性(如栈、链表等)来存储EventLoop。
但是Netty实现线程池的方式非常奇特:
(1)EventLoop是EventLoopGroup的子类,因此EventLoop具有EventLoopGroup的大部分功能,如提交任务(submit(Runnable task))、优雅的关闭(shutdownGracefully())等。
(2)为了让EventLoopGroup能对EventLoop进行管理,一方面,EventLoopGroup继承了Iterable接口,具有迭代器的功能,能使用迭代器访问自己所包含的EventLoop。另一方面,EventLoopGroup提供了next()方法用于选择自己所包含的其中一个EventLoop,至于如何选择,则涉及到Netty的执行器选择策略,以后再做分析。

类继承关系图

其类继承关系图如下:

在这里插入图片描述
1、最顶层的接口为Executor(执行器)
其中只包含一个execute(Runnable command)方法,是执行器用来执行任务的方法。

2、ExecutorService(执行服务器)
ExecutorService也是一个执行器,但是提供了对执行器进行终止管理的方法(shutdown()和shutdownNow()),以及可以生成Future以跟踪一个或多个异步任务进度的方法(submit(Runnable task)方法)。

3、再然后到ScheduledExecutorService(计划执行服务器)
在执行服务器的基础上扩展了schedule(延迟执行任务)、scheduleAtFixedRate(周期执行任务)等方法。

4、再然后到EventExecutorGroup(事件执行器组)
其继承了ScheduledExecutorService(计划执行服务器)和Iterable(迭代器),并继续扩展了shutDownGracefully(将自己管理的所有EventExecutor给优雅的关闭)、terminationFuture(返回自己的terminationFuture,它指示EventExecutorGroup的关闭动作的执行情况)等针对事件执行器特有的方法。

5、再然后EventLoopGroup(事件循环执行器组)
继承自EventExecutorGroup,它在EventExecutorGroup的基础上提供了注册Netty Channel的功能。EventLoopGroup 是 Netty 中用于管理和调度事件循环的一个接口,它扮演着线程池的角色。在 Netty 中,每个 EventLoopGroup 都包含一个或多个 EventLoop,用于处理事件驱动的任务,比如网络 I/O 操作、定时任务等。

6、最后是EventLoop(事件循环执行器)
EventLoop 从字面上来看就是 event + loop,也就是事件循环,事件就是 I/O 事件,循环则可以理解为 while(true)。直白点就是EventLoop 是通过循环方式来处理 I/O 事件的执行器。EventLoop 本质就是一个单线程执行器,当客户端与服务端建立连接后,服务端会创建一个 Channel,并将该 Channel 与 EventLoop 绑定。绑定后,该 Channel 在整个生命周期内所有发生的事件都由该 EventLoop 来处理,但是一个 EventLoop 可以绑定多个 Channel。对于 EventLoop 而言,它就是通过不断循环它所绑定的 Channel 事件列表,检测是否有事件发生,如果有,则将该事件分发给 worker 线程处理。
另外,EventLoop继承了OrderedEventExecutor和EventLoopGroup,其中OrderedEventExecutor有序的事件执行器接口,表示该执行器会有序/串行的方式执行。

相关源码

1、Executor(执行器)

事件执行器,内部只提供了一个execute(Runnable command)执行任务的方法。
该方法在未来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行,具体由Executor实现决定。

/** Executes the given command at some time in the future.* The command may execute in a new thread, in a pooled thread,* or in the calling thread, at the discretion of the Executor implementation.* 在将来的某个时刻执行给定的指令,这个指令可以在新的线程里执行,也可以在一个线程池中的线程执行,* 或者在调用者线程里面执行,由执行器自由决定。* Params:command – the runnable task* 参数:command-可执行的任务* Throws:RejectedExecutionException – if this task cannot be accepted for execution;* 抛出:RejectedExecutionException,如果任务不能被接受执行* NullPointerException – if command is null
* */
public interface Executor {void execute(Runnable command);
}

2、ExecutorService(执行服务器)

ExecutorService也是一个执行器,但是提供了对执行器进行终止管理的方法,以及可以生成Future以跟踪一个或多个异步任务进度的方法。

public interface ExecutorService extends Executor {/** Initiates an orderly shutdown in which previously submitted tasks are * executed,but no new tasks will be accepted. Invocation has no * additional effect if already shut down.This method does not wait for * previously submitted tasks to complete execution.Use awaitTermination * to do that.* 启动有序关闭,执行之前提交的任务,但不接受新任务。如果已经关闭,则调用没有额* 外效果。此方法不会等待以前提交的任务完成执行。此方法不会等待以前提交的任务完* 成执行,可以使用awaitTermination来实现。* * Throws:SecurityException – if a security manager exists and shutting * down this ExecutorService may manipulate threads that the caller is * not permitted to modify because it does not hold * RuntimePermission("modifyThread"), or the security manager's * checkAccess method* 抛出:SecurityException,如果存在一个安全管理器,关闭这个ExecutorService* 会操作调用者不允许修改的线程,因为调用者没有运行时许可,或者安全管理器的检查* 入口方法。* */void shutdown();/*** Attempts to stop all actively executing tasks, halts the processing of * waiting tasks, and returns a list of the tasks that were awaiting * execution.* 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。* This method does not wait for actively executing tasks to terminate. * Use awaitTermination to do that.* 此方法不会等待正在运行的任务终止,但是可有使用awaitTermination来完成。* There are no guarantees beyond best-effort attempts to stop processing*  actively executing tasks. For example, typical implementations will * cancel via Thread.interrupt, so any task that fails to respond to * interrupts may never terminate.* 除了尽最大努力尝试停止处理正在执行的任务外,没有其他保证。例如,典型的实现将* 通过Thread.interrupt取消,因此任何无法响应中断的任务都可能永远不会终止。* Returns:* list of tasks that never commenced execution* 返回值:从未开始执行的任务列表*/List<Runnable> shutdownNow();/***返回执行器是否被关闭*/boolean isShutdown();/***返回执行器关闭后所有任务是否执行结束*/boolean isTerminated();/**提交一个有返回值的任务以供执行,并返回一个表示等待任务执行结果的Future。Future的get方法将在成功完成后返回任务的结果。如果你想立即阻止等待任务执行完成,你可以使用“result=exec.submit(aCallable).get()”这种格式的构造;*/<T> Future<T> submit(Callable<T> task);/**提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法在成功完成后将返回null。*/Future<?> submit(Runnable task);/**执行给定的任务集合,当所有任务都完成时,返回一个Future的集合来封装任务的状态和集合。*/<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
}

3、ScheduledExecutorService(计划执行服务器)

ScheduledExecutorService有线程池的特性,也可以实现任务循环或延迟执行,可以看作是一个简单的定时任务组件,因为有线程池特性,所以任务之间可以多线程并发执行,互不影响,当任务来的时候,才会真正创建线程去执行。

public interface ScheduledExecutorService extends ExecutorService {/**创建一个延时执行的任务,返回一个等待任务完成的ScheduledFuture*/public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);/*** 创建一个延时执行并且有返回结果的任务*/public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);/**创建并执行一个周期性动作,该动作在给定的初始延迟后首先启用,随后在给定的时间内启用;也就是说,执行将在initialDelay、initialDelay+period、initialDelay+2*period之后开始,依此类推。如果任务的任何执行遇到异常,后续执行将被抑制。否则,任务只能通过取消或终止执行者来终止。如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会同时执行。*/public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);/**创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在一次执行终止和下一次执行开始之间的给定延迟内启用。如果任务的任何执行遇到异常,后续执行将被抑制。否则,任务只能通过取消或终止执行者来终止。*/public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);}

4、EventExecutorGroup(事件执行器组)

EventExecutorGroup继承自ScheduledEventExecutorService,因此它有着提交延时任务的功能。
并且它继承了Iterable接口,因此他可以轮询自己管理的EventExecutor。

EventExecutorGroup提供的功能有:

  • next:选择一个EventExecutor将其返回,具体如何选择涉及Netty的执行器选择策略。
  • iterator:返回遍历自己管理的EventExecutor集合的迭代器。
  • shutDownGracefully:将自己管理的所有EventExecutor给优雅的关闭,可以指定timeout,也可以不指定timeout。优雅的关闭是Netty的特色功能,具体的实现逻辑暂不分析。
  • isShuttingDown:自己管理的所有EventExecutor是否正在shutDownGracefully,或已经shutdown了。
  • terminationFuture:返回自己的terminationFuture,它指示EventExecutorGroup的关闭动作的执行情况。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {/*** 优雅的关闭*/Future<?> shutdownGracefully();/**将自己管理的所有EventExecutor给优雅的关闭,可以指定timeout,也可以不指定timeout*/Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);/*** 返回一个Future对象,将在本事件执行器组所管理的所有执行器都终止时得到通知*/Future<?> terminationFuture();/*** @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.*/@Override@Deprecatedvoid shutdown();/*** 返回一个自己管理的EventExecutor*/EventExecutor next();@OverrideIterator<EventExecutor> iterator();
}

5、EventLoopGroup(事件循环执行器组)

EventLoopGroup 是 Netty 中用于管理和调度事件循环的一个接口,它扮演着线程池的角色。在 Netty 中,每个 EventLoopGroup 都包含一个或多个 EventLoop,用于处理事件驱动的任务,比如网络 I/O 操作、定时任务等。
EventLoopGroup(事件循环执行器组),继承自EventExecutorGroup,它在EventExecutorGroup的基础上提供了注册Netty Channel的功能,即register(Channel channel)方法。需要注意的是,channel实际上是注册到EventLoop上面。因为EventLoop是EventLoopGroup的子类,所以EventLoop也具有通道注册的功能。在EventLoopGroup的实现类中(如MultithreadEventLoopGroup),register(Channel channel)方法实际上是先选择一个自己包含的EventLoop,然后调用EventLoop的register(Channel channel)方法。

public interface EventLoopGroup extends EventExecutorGroup {/*** Return the next {@link EventLoop} to use*/@OverrideEventLoop next();/*** 通道注册*/ChannelFuture register(Channel channel);/***  通道注册*/ChannelFuture register(ChannelPromise promise);/*** 通道注册*/@DeprecatedChannelFuture register(Channel channel, ChannelPromise promise);
}

6、EventLoop

EventLoop是EventLoopGroup的一个属性,   
关于EventLoop以及EventLoopGroup的映射关系为:

  • 一个EventLoopGroup 包含一个或者多个EventLoop;
  • 一个EventLoop 在它的生命周期内只和一个Thread 绑定;
  • 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理;
  • 一个Channel 在它的生命周期内只注册于一个EventLoop。
  • 一个EventLoop 可能会被分配给一个或多个Channel。
    Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 绑定到这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}

总结

EventLoopGroup和EventLoop是Netty中的重要组件,EventLoopGroup是执行器组,EventLoop是其包含的单个执行器。这两个都是接口,在其具体的实现类中。EventLoop执行器是一个只有一个线程的执行器,即单线程执行器。因此,EventLoopGroup可以说具有线程池的功能, EventLoop则类似一个线程。

延申

EventLoopGroup和EventLoop中的execute(Runnable command)方法最终会选择怎样的方式去执行任务?
EventLoopGroup和EventLoop都实现了Executor接口,因此都具有execute(Runnable command)方法方法。在Executor接口中给出的execute(Runnable command)方法定义是“Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation”。意思即:在将来的某个时间执行给定的command ,这个command 可以由一个新线程执行,或者由一个池化(线程池中的)的线程执行,也可以由调用本方法的线程来执行,由执行器自由决定。
由此可知,execute(Runnable command)方法可以由Executor接口的子类自由去实现,只要最终执行了command即可。而EventLoopGroup和EventLoop都只是接口,也没有去实现execute(Runnable command)方法。那么在Netty中,execute(Runnable command)方法一般是怎么实现的呢?

以常用的NioEventLoopGroup和NioEventLoop为例,
(1)其中NioEventLoopGroup的execute(Runnable command)方法在其父类AbstractEventExecutorGroup中给出实现:

@Overridepublic void execute(Runnable command) {//先使用next()方法,选出一个执行器(即一个NioEventLoop),然后调用//执行器的execute(Runnable command)next().execute(command);}

在我们常用的NioEventLoopGroup中,execute(Runnable command)的执行逻辑是先获取其中的一个执行器(一个NioEventLoop实例),然后调用NioEventLoop实例的execute(Runnable command)方法去执行任务。

(2)EventLoop的execute(Runnable command)方法是在其父类SingleThreadEventExecutor中进行的实现:
实现代码如下:

    public void execute(Runnable task) {execute0(task);}private void execute0(@Schedule Runnable task) {//task 任务不能为 nullObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}

最终调用下面的方法:

/** 执行任务的方法,这里并不直接运行Runnable任务的run()方法,* 而是将任务添加到队列中,然后启动线程去执行* */private void execute(Runnable task, boolean immediate) {// 当前线程是不是执行器线程boolean inEventLoop = inEventLoop();// 将任务添加到待普通执行任务队列taskQueue中// 注意这里是可以被不同线程调用的,所以有并发冲突问题。// 因此任务队列taskQueue 必须是一个线程安全的队列,就是可以处理并发问题。addTask(task);//如果当前线程不是执行器的线程if (!inEventLoop) {//要调用 startThread方法开启执行器线程,//这个方法做了判断,只有当执行器状态是 ST_NOT_STARTED 才会开启执行器线程startThread();// 如果执行器状态已经 Shutdown 之后,就要拒绝任务。// 注意这里的状态是已经 Shutdown 之后,所以不包括开始 Shutdown 的状态。if (isShutdown()) {boolean reject = false;try {// 移除任务if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}// 是否唤醒可能阻塞的执行器线程//addTaskWakesUp属性为调用addTask(Runnable)添加任务时是否能唤醒线程//immediate为任务是否立即执行if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}

可以看出,EventLoop执行execute(Runnable task)方法的步骤为:
①执行addTask(task)将任务添加到队列中。
其最终会将任务添加到taskQueue队列之中。

    //添加任务protected void addTask(Runnable task) {ObjectUtil.checkNotNull(task, "task");if (!offerTask(task)) {reject(task);}}//将任务加入普通队列final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}

②判断当前线程是不是本执行器EventLoop的执行线程,如果不是的话,调用startThread()来创建执行器的线程。
当我们第一次调用EventLoopGroup或EventLoop的execute(Runnable task)方法时,执行器处于未运行状态,此时会进入到doStartThread()方法中。至于doStartThread()方法究竟做了什么,以后再分析。

    //开启执行器的线程private void startThread() {// 只有执行器处于未运行状态,才需要开启运行if (state == ST_NOT_STARTED) {// 通过CAS方式,将执行器变成运行状态if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 执行器开启运行doStartThread();success = true;} finally {//出错的话,执行器状态还原if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com