NetworkManager可以看到只存在于TaskManager中,用于TaskManager之间的数据传输。是基于netty实现的。
初始化
NetworkManager是在TaskManager中运行的一个服务。所以会在TaskManager初始化过程中。入口是TaskManager的main方法。
NettyConnectionManager初始化
NettyConnectionManager 是 Flink 里管理网络连接的组件,借助 Netty 框架处理通信。它能初始化 Netty 服务器和客户端,管理缓冲区池,创建分区请求客户端工厂。可启动和关闭服务,创建与管理分区请求客户端,还能统计活动连接数量。
方法入口:
最终创建NettyConnectionManager
NettyConnectionManager创建流程图
主要变量:
- NettyServer server
作用:代表 Netty 服务器实例,负责监听网络端口,接收来自其他节点的连接请求,处理入站的网络数据。
- NettyClient client
作用:代表 Netty 客户端实例,用于主动连接到其他节点,发送请求并接收响应,处理出站的网络数据。
- NettyBufferPool bufferPool
作用:管理网络通信时所需的缓冲区,负责分配和回收直接内存缓冲区,提高内存使用效率,减少内存分配和回收的开销。
- PartitionRequestClientFactory partitionRequestClientFactory
作用:用于创建 PartitionRequestClient 实例,这些实例负责向远程节点请求结果分区数据,管理客户端连接的创建和复用。
- NettyProtocol nettyProtocol
作用:定义了 Netty 通信的协议,包含了如何处理请求和响应的逻辑,确保不同节点之间能够正确地进行数据交互。
netty服务端初始化
创建ServerBootstrap并初始化
优先使用epoll模式,其次使用nio
设置channel
可以看到channel的信息在NettyProtocol的getServerChannelHandlers方法获取。
可以看到设置了4个ChannelHandler。
各自功能如下:
- messageEncoder:NettyMessage.NettyMessageEncoder 实例,负责将消息编码为适合网络传输的格式。
- new NettyMessage.NettyMessageDecoder():NettyMessage.NettyMessageDecoder 实例,用于将接收到的网络数据解码为消息对象。
- serverHandler:PartitionRequestServerHandler 实例,处理客户端的分区请求和任务事件请求。
- queueOfPartitionQueues:PartitionRequestQueue 实例,管理分区请求队列,确保请求按顺序处理。
这些处理器共同构成了 Netty 服务器的处理链,实现消息的编码、解码、请求处理和队列管理。
启动server
netty客户端初始化
跟netty服务端类似。
创建Bootstrap并初始化
设置channel
client初始化的时候并没有设置channel,因为client此时并不知道要请求哪个server的数据,同时client也会请求多个server数据的情况。所以在需要请求server数据的时候才会进行channel设置。
client的channel信息也在NettyProtocol中。
包含三个处理器,各自功能如下:
- messageEncoder:用于将消息编码为适合在网络中传输的格式,方便数据在网络上发送。
- new NettyMessageClientDecoderDelegate(networkClientHandler):NettyMessageClientDecoderDelegate 实例,它是一个解码器委托类,借助传入的 networkClientHandler 来处理解码后的消息。
- networkClientHandler:NetworkClientHandler 实例,负责处理客户端的网络请求和响应,是客户端网络操作的核心处理逻辑。
这些处理器共同构成了 Netty 客户端的处理链,实现消息的编码、解码和客户端网络操作的处理。
其中CreditBasedPartitionRequestClientHandler 通过信用机制实现了客户端与服务器之间的数据请求和传输控制,确保数据的高效、稳定传输,同时支持背压处理,避免系统过载。
绑定server
创建client
是调用NettyConnectionManager的createPartitionRequestClient来创建client,其中参数connectionId中包含了远端server的ip和端口。
首先对maxNumberOfConnections取余,创建一个新的connectionId。这个是为了复用tcp连接,将多个逻辑连接映射到少量的物理连接上,达到复用连接的作用,这样也便于节省资源,控制流量。
maxNumberOfConnections默认是1
connectWithRetries是重试连接server,创建client。
最后返回一个NettyPartitionRequestClient,这个是flink对nettyClient的高级封装,处理业务逻辑。
最后调用的是nettyClient.connect,设置channel,绑定server。
客户端请求数据
在上面客户端创建,最后返回NettyPartitionRequestClient实例。
请求数据是requestSubpartition方法,请求对应分区的数据。
首先在handler中注册inputChannel,再创建PartitionRequest,最后调用tcpChannel的writeAndFlush发送消息。
服务端处理数据
server处理PartitionRequest消息是在PartitionRequestServerHandler中。
生成reader用来读取数据,将reader注册到PartitionRequestQueue中。
PartitionRequestQueue负责写数据的是writeAndFlushNextMessageIfPossible。
循环获取已经注册的reader,从reader中获取buffer(包含读取的数据),创建BufferResponse并发送。要是reader后面还有数据就加回队列。
客户端接收数据
CreditBasedPartitionRequestClientHandler接收数据,用来处理BufferResponse消息。
获取当初注册的inputChannel,调用decodeBufferOrEvent来处理BufferResponse消息。
调用RemoteInputChannel的onBuffer方法,在onBuffer中将收到的buffer缓存到receivedBuffers中。后续从receivedBuffers读取数据。
读取receivedBuffers是从getNextBuffer方法进入。