欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > 【图解IO与Netty系列】Netty源码解析——事件循环

【图解IO与Netty系列】Netty源码解析——事件循环

2024/11/30 14:45:19 来源:https://blog.csdn.net/weixin_43889578/article/details/136546330  浏览:    关键词:【图解IO与Netty系列】Netty源码解析——事件循环

Netty源码解析——事件循环

  • Netty事件循环
  • 源码解析
    • select()
    • processSelectedKeys()
      • NioMessageUnsafe#read()
      • NioByteUnsafe#read()
    • runAllTasks()

Netty事件循环

当Netty服务端启动起来以后,就可以接受客户端发送的请求,接收到客户端发来的请求后就会有事件就绪,有事件就绪就会在Netty的事件循环中被监听到并处理,我们下面看看Netty事件循环的逻辑。

在这里插入图片描述

Netty中的NIO事件循环的代码位于NioEventLoop的run()方法内部,这个方法总体是一个for(;;)死循环,然后for循环里头依次执行的三个重要方法分别是:

  1. select():调用Selector#select()方法监听注册到Selector上的Channel,等待Channel关注的事件就绪。
  2. processSelectedKeys():处理就绪事件,会调用ChannelPipeline处理,ChannelPipeline又会通过责任链模式调用里面的ChannelHandler处理。
  3. runAllTasks():处理NioEventLoop中的taskQueue的异步任务。

这就是Netty事件循环的大体逻辑,下面我们进入代码解析。

源码解析

select()

    private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}

在NioEventLoop的事件循环中,首先是select()方法的调用,select()方法里面就是调用NioEventLoop对应的Selector的select()方法,阻塞当前线程,监听注册到Selector的Channel,等待事件就绪。

在这里插入图片描述

当有事件就绪时,当前线程就会解阻塞,然后调用processSelectedKeys()方法处理就绪事件。

processSelectedKeys()

NioEventLoop的 processSelectedKeys()方法会进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (...) {...}}

无论是accept事件,还是read事件时,都是调用unsafe.read()方法。

在这里插入图片描述

只是这里的Unsafe的类型有可能不一样,如果是NioServerSocketChannel的话,那么Unsafe的类型就是NioMessageUnsafe,是在创建NioServerSocketChannel时就创建好的,我们看一下NioMessageUnsafe的read()方法。

NioMessageUnsafe#read()

    private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {...doReadMessages(readBuf);...int size = readBuf.size();for (int i = 0; i < size; i ++) {...pipeline.fireChannelRead(readBuf.get(i));}...}}

重要方法就两个doReadMessages(readBuf)和pipeline.fireChannelRead(readBuf.get(i)),其他代码全部省略不看。

在这里插入图片描述

我们先看doReadMessages(readBuf)

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());...buf.add(new NioSocketChannel(this, ch));...}

SocketUtils.accept(javaChannel())里面是调用ServerSocketChannel的accept()方法,返回一个SocketChannel。

然后new NioSocketChannel(this, ch)把返回的SocketChannel包装成NioSocketChannel,放入buf中,这个buf就是外面read()方法的readBuf。

在这里插入图片描述

NioSocketChannel的构造方法调用父类AbstractNioByteChannel的构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}

可以看到指定关注的事件类型是read事件,再看看AbstractNioByteChannel的父类AbstractNioChannel的构造方法

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (...) {...}}

继续调用父类的构造方法,然后保存了NioSocketChannel和关注的事件类型OP_READ,设置NioSocketChannel为非阻塞。

再看AbstractNioChannel的父类AbstractChannel的构造方法

    protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}

创建一个Unsafe,然后初始化了ChannelPipeline。newUnsafe()方法会进入NioSocketChannel的newUnsafe()方法,创建的时NioSocketChannelUnsafe类型的Unsafe,因此NioSocketChannel的Unsafe类型就是NioSocketChannelUnsafe。

    protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();}

NioSocketChannel的构造方法总结起来就是做了5件事:

  1. 创建并保存NioSocketChannelUnsafe
  2. 创建并保存ChannelPipeline
  3. 保存SocketChannel
  4. 保存关注的事件类型OP_READ
  5. 设置SocketChannel为非阻塞

在这里插入图片描述

然后回到unsafe.read()方法,接下来执行的代码pipeline.fireChannelRead(readBuf.get(i))就是调用触发就绪事件的Channel对应的ChannelPipeline的fireChannelRead(…)方法,触发ChannelPipeline中每个处理入站事件的ChannelHandler的入站事件处理。

ChannelPipeline的fireChannelRead(…)方法会从头到尾以责任链的处理方式调用每个ChannelInboundHandler类型的channelRead(…)方法。NioServerSocketChannel的ChannelPipeline中的ChannelHandler是ServerBootstrapAcceptor。我们看看ServerBootstrapAcceptor的channelRead(…)方法。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);try {childGroup.register(child).addListener(new ChannelFutureListener() {...});} catch (...) {...}}

这里的参数msg的类型是NioSocketChannel,就是上面放入buf中的NioSocketChannel,调用pipeline.fireChannelRead(readBuf.get(i))方法时readBuf.get(i)就是中buf中取出NioSocketChannel作为参数传进来。

然后child.pipeline().addLast(childHandler)这一行这里的childHandler是我们定义的ChannelInitializer,这里放入NioSocketChannel的ChannelPipeline中,当NioSocketChannel里面的SocketChannel被注册到Selector之后,会触发ChannelInitializer的调用,初始化ChannelPipeline。

然后childGroup.register(child)就是把这个NioSocketChannel注册到workerGroup中的其中一个NioEventLoop上,也就是把NioSocketChannel中的SocketChannel注册到workerGroup中的其中一个NioEventLoop的Selector上。

在这里插入图片描述

这里NioSocketChannel注册的细节跟NioServerSocketChannel的注册是一样的,上一篇文章已经分析过,这里就不重复了。

NioSocketChannel注册好之后,就可以接收客户端发来的数据,于是又有read事件触发,此时调用的unsafe.read(),就是NioSocketChannelUnsafe的父类NioByteUnsafe的read()方法了。

NioByteUnsafe#read()

        @Overridepublic final void read() {...byteBuf = allocHandle.allocate(allocator);doReadBytes(byteBuf);...pipeline.fireChannelRead(byteBuf);...}

就是通过allocator分配一个ByteBuf,然后把Channel中的数据读取到ByteBuf中,然后调用pipeline.fireChannelRead(byteBuf)触发ChannelPipeline的处理,然后ChannelPipeline中的ChannelHandler就会处理byteBuf中的数据,这里的ChannelPipeline中的ChannelHandler就是我们定义的ChannelInitializer组装到ChannelPipeline中的ChannelHandler了。

在这里插入图片描述

runAllTasks()

runAlllTasks方法其实就是从NioEventLoop的taskQueue中不停的取出task并执行。

    protected boolean runAllTasks(long timeoutNanos) {...Runnable task = pollTask();...for (;;) {safeExecute(task);...task = pollTask();if (task == null) {...break;}}...}

可以看到,就是在一个for循环中,pollTask()方法取出task,然后在下一轮循环中调用safeExecute(task)去执行,safeExecute(task)里面就是调用task.run()方法直接执行,没什么好看的。如果pollTask()方法取出的task为null,那么就break结束循环。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com