整体架构分析
基本流程
模块特性
发送消息流程原理分析
同步发送 sync
异步发送 async
直接发送 one-way
主从同步(HA)机制分析
消息投递
持久化机制
RocketMQ的RPC通信
RocketMQ中Remoting通信模块的具体实现
消息的协议涉及与编码解码
消息的通信方法和通信流程
Client发送请求消息的具体实现
Server端接收消息并进行处理的实现
使用Netty作为底层通信库的原因
RocketMQ中RPC通信的Netty多线程模型
Netty的Reactor多线程模型设计概念与简述
RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现
安装参数
启动相关
Q&A
整体架构分析
基本流程
RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发,包含了4个模块:
Namesrv: 存储当前集群所有Broker信息,以及Topic和Broker的关系。
Broker: 集群最核心模块,主要负责当前Topic消息存储,消费者的消费位点管理(消费进度)。
Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID,同一个ID下所有实例组成一个生产者集群。
Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费实例可以共用同一个ID,同一个ID下所有实例组成一个消费者集群。
工作流程描述:
1、启动Namesrv,启动后监听端口,等待Broker/Producer/Consumer连接上来,相当于一个路由控制中心。
2、Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Namesrv集群中就有Topic和Broker的映射关系。
3、收发消息前,先创建Topic,创建Topic时需要指定topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4、Producer启动并发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5、Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。
模块特性
- Namesrv
Namesrv用户存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间没有通信,所以Broker往往将多个Namesrv的地址都配置上。Namesarv的压力不会太大,主要的开销在于维持心跳和提供Topic-Broker的关系数据。Broker向Namesrv发送心跳时,会带上自己负责的topic信息,如果Topic个数太多(万级别),导致一次心跳的数据就有可能几十M,容易造成Namesrv误以为Broker心跳失败。
- Broker
- 高并发读写服务
Broker的高并发主要依靠两点:消息顺序写,所有Topic数据只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,TPS大幅度提高;消息随机读,RocketMQ尽可能的让读命中系统pagecache。
- 负载均衡和动态伸缩
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
动态伸缩能力:Broker的伸缩性提现在两个维度:Topic,Broker
Topic维度:假如一个Topic的消息量特别大,但是集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker即可。
- 高可用和高可靠
高可用:集群部署时候一般为主备,备机从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
高可靠:所有往Broker发的消息,有同步刷盘和异步刷盘;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失。
- Broker与Namesrv的心跳机制
单个Broker跟所有的Namesrv保持心跳,间隔为30秒,心跳请求包含了当前Broker的所有Topic信息。Namesrv同样会反查Broker的心跳,如果某个Borker在两分钟之内都没有心跳,则任务该Broker下线,调整Topic跟Broker的对应关系。
- Consumer
消费者启动时候指定了Namesrv的地址,与其中一个保持长连接。消费者每个30秒从Namesrv获取所有topic的最新队列信息。连接建立后,从Namesrv获取当前消费Topic所涉及的Broker,直连Broker。
Consumer同样跟Broker保持长连接,会每隔30s发心跳信息到Broker,Broker每10s检查一次当前存活消费者,如果发现2分钟没有心跳,就断开与该消费者的链接,并且向消费组的其他实例发送通知,触发消费集群的负载均衡。
消费者的消费方式有两种:集群消费、广播消费。
- Producer
生产启动后同样选择一台Namesrv进行长连接。同样从Namesrv获取Topic和Broker的信息。并发发送心跳。生产者发送消息时,会轮询当前可发送的Broker,一次发送成功后,下次发送到另外一个,已达到负载均衡。
发送消息流程原理分析
从功能上说RocketMQ支持三种发送消息的方式,分别死同步发送、异步发送和直接发送。
同步发送 sync
只有在消息完全发送完成后才返回结果,此方式需要同步等待发送结果的时间代价。
有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次。发送的结果存在同一个消息可能被多次发送给Broker,需要开发者在消费端处理幂等性问题。
异步发送 async
Message msg = new Message("topic", "tag", ("Hello world").getBytes()); producer.send(msg, new SendCallBack() { // callback method after send success public void onSuccess(SendResult sendResult) { System.out.println("print send result:" + sendResult); } // callback method after send failure public void onException(Throwable a) { System.out.println("get a exception:" + e); } }))
异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送成功或失败。异步模式通常用于响应时间敏感的业务 。同样存在重试机制和发送同一消息的情况,需要在消费端处理幂等性问题。
直接发送 one-way
producer.sendOneWay(msg);
采用one-way发送模式发送消息的时候,发送端发送完消息后会立刻返回,不会等待来自Broker的ack来告知本次消息发送是否完全完成发送。此方式的吞吐量很大,但是存在消息丢失的风险,适合不重要的消息发送,比如日志。
主从同步(HA)机制分析
基本流程图如下:
RocketMQ的主从同步机制如下:
1、首先启动Master并在指定端口监听;
2、客户单启动,主动连接Master,建立TCP连接;
3、客户端以5秒的间隔时间向服务器拉取消息,如果第一次拉取的话,先获取本地commitlog文件中最大的偏移量,以该偏移量向服务端拉取消息;
4、服务端解析请求,并返回一批数据给客户端。
5、客户端收到一批消息后,将消息写入到本地commitlog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
6、重复第3步。
RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但是可以提供消息读取。
默认情况下,消息的去读都是主服务器,主从同步引入的主要目的是消息堆积的内容默认超过内存的40%,则消息读取由从服务器来接管,实现消息的读写分离,避免主服务的IO抖动严重。
HA消息消费进度机制:
1、消费者在反馈消息消费进度时会优先选择主服务器,此时主服务器的消费进度立马更新了,从服务器此时只需定时同步主服务器的消息消费进度即可。
2、消费者在向从服务器拉取消息时,如果是从服务器,在处理消息拉取时,也会更新消息消费进度。
消息投递
RocketMQ的消息模型如下:
一个Topic可能对应多个实际的消息队列(Message Queue)
在底层实现上,为了提供MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体如上图所示,每个消息队列在使用中应当保证先入先出(FIFO)的方式进行消费。
- 默认情况下,采用最简单的轮询算法,这个算法可以保证每一个Queue队列的消息投递数量尽可能均匀。
- 对于默认情况下的增强方式:在某些Queue队列可能由于自身数量积压的原因,投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。基于这样的情况,RocketMQ每发送一个消息后,会统计一下消息投递的时间延迟,根据这个延迟,知道哪些Queue队列投递的速度快。这样情况下优先使用消息投递延迟最小的策略,如果没有生效,再使用轮询算法。
- 在某些特定场景下,需要消息投递和消费的顺序性。对于相同订单号的消息,通过一定策略,将其放置在一个Queue队列中,然后消费者再采用一定策略(一个线程独立处理一个queue),就能保证消费的顺序性。
延迟投递/批量投递均可。
消息事务,Half Msg需要Producer确定commint或者rollback.
持久化机制
RocketMQ是一款高性能、高可靠的分布式消息中间件,要保证高可靠,数据就必须持久化到磁盘上,将数据持久化到磁盘上,那么可能不能保证高性能。一般磁盘的性能在顺序读写的情况下速度可以达到450MB/S-600MB/S,但是在随机读写的情况下,速度可能只有几百KB/S。RocketMQ在持久化的设计上,采取的是消息顺序写,随机读的策略,利用磁盘顺序写的速度,让磁盘的写速度不会成为系统的瓶颈。并且采用MMap这种“零拷贝”技术,提高消息存盘和网络发送的速度。极力满足RocketMQ的高性能、高可靠要求。
RocketMQ的持久化机制的架构图:
CommitLog: 消息真正的存储文件,所有消息都存储在CommitLog中。
ConsumeQueue:消息消费逻辑队列,类似数据库的索引文件。
IndexFile:消息索引文件,主要存储消息Key与Offset对应关系,提升消息检索速度。
Broker将消息顺序写入到CommitLog中,这也就是RocketMQ高性能的原因。但是消费者消费消息的时候往往只关心订阅主题下的所有消息,但是同一主题的消息在CommitLog文件中可能是不连续的,那么消费者消费消息的时候,需要将CommitLog文件加载到内存中遍历查找订阅主题下的消息,频繁的IO操作,性能就会急剧下降。
为解决该问题,RocketMQ引入了Consumequeue文件,可以看做是索引文件,类似于MySQL的二级索引。在存放了同一主题下的所有消息,消费者消费的时候只需要去对应的Consumequeue组中取消息即可。Consumequeue不负责存储消息,只负责记录它所属Topic的消息的CommitLog中的偏移量,这样消费者根据偏移量定位到消息,Consumequeue文件不会存储消息的全量信息。
RocketMQ的RPC通信
rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(rocketmq-client, rocketmq-broker, rocketmq-namesrv)所依赖和引用,为了实现客户端与服务端之间高效的数据情况与接收,RocketMQ消息队列自定义了通信信息并在Netty的基础之上扩展了通信模块。
RocketMQ中Remoting通信模块的具体实现
1、RemotingService:最上层的接口:
public interface RemotingService { void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook); }
2、RemotingClient/RemotingServer:两个接口集成了最上层接口RemotingService,分别各自为Client和Server提供锁必须的方法
3、NettyRemotingAbstract:Netty通信处理的抽象类,定义并封装了Netty处理的公共处理方法
4、NettyRemotingClient/NettyRemotingServer:分别实现了RemotingClient和RemotingServer,都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client, nameserver, broker在进行消息的发送和接收时均使用这两个组件)。
消息的协议涉及与编码解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
RemotingCommand类的部分成员变量如下:
Header字段 | 类型 | Request说明 | Response说明 |
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC的标志 | 区分是普通RPC还是onewayRPC的标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap | 请求自定义扩展信息 | 响应自定义扩展信息 |
例如Broker向NameServer发送一次心跳注册的报文:
code=103,//这里的103对应的code就是broker向nameserver注册自己的消息 language=JAVA, version=137, opaque=58,//这个就是requestId flag(B)=0, remark=null, extFields={ brokerId=0, clusterName=DefaultCluster, brokerAddr=ip1: 10911, haServerAddr=ip1: 10912, brokerName=LAPTOP-SMF2CKDN }, serializeTypeCurrentRPC=JSON
RocketMQ通信协议的格式如下:
1、消息长度:总长度、四个字节存储、占用一个int类型
2、序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度
3、消息头数据:经过序列化后的消息头数据
4、消息主体数据:消息主体的二进制字节数据内容
消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成。
消息的通信方法和通信流程
RocketMQ消息队列中支持通信的方式如下:同步、异步、单向
同步通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。
Client发送请求消息的具体实现
当客户端调用异步通信接口invokeAsync时候,先由RemotingClient的实现类-NettyRemotingClient根据addr获取相应的channel(如果本地缓存中没有则创建),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正做完发送请求动作是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。
/*** invokeAsync(异步调用)* * @param channel* @param request* @param timeoutMillis* @param invokeCallback* @throws InterruptedException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {//相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);//根据request ID构建ResponseFuturefinal ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);//将ResponseFuture放入responseTablethis.responseTable.put(opaque, responseFuture);try {//使用Netty的channel发送请求数据channel.writeAndFlush(request).addListener(new ChannelFutureListener() {//消息发送后执行@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {//如果发送消息成功给Server,那么这里直接Set后returnresponseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseFuture.putResponse(null);responseTable.remove(opaque);try {//执行回调executeInvokeCallback(responseFuture);} catch (Throwable e) {log.warn("excute callback in writeAndFlush addListener, and callback throw", e);} finally {//释放信号量responseFuture.release();}log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {//异常处理responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}
在Client端发送请求消息时有个比较重要的数据结构:
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable
opaque表示请求发起方在多个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方法,无论哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射-responseTable中。
ResponseFuture保存返回响应
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { this.opaque = opaque; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.once = once; }
对于同步通信来说,第三、四个参数为null,对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个流程同时往一个连接写数据时可以通过信号量控制授权同时写许可的数量。
异常发送流程处理-定时扫描responseTable本地缓存
在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将出现堆积情况,这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生频率为1s调用一次的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回,并进行相应的处理。
Server端接收消息并进行处理的实现
Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法,其中服务端最为重要的处理方法实现如下:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {//根据RemotingCommand中的code获取processor和ExecutorServicefinal Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {//rpc hookRPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();if (rpcHook != null) {rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);}//processor处理请求final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);//rpc hookif (rpcHook != null) {rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);}if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {PLOG.error("process request over, but response failed", e);PLOG.error(cmd.toString());PLOG.error(response.toString());}} else {}}} catch (Throwable e) {if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException".equals(e.getClass().getCanonicalName())) {PLOG.error("process request exception", e);PLOG.error(cmd.toString());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {//封装requestTaskfinal RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);//想线程池提交requestTaskpair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //+ ", too many requests and system thread pool busy, RejectedExecutionException " //+ pair.getObject2().toString() //+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";//构建responsefinal RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}
上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器,然后生成一个新的线程提交至对应的业务线程池进行异步处理。
processorTable - 请求业务码与业务处理、业务线程池的映射变量
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
使用Netty作为底层通信库的原因
- Netty的编程API使用简单,开发门槛低,无需开发人员去关注和了解太多的NIO编程模型和概念。
- 可根据业务的要求进行定制化开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展
- Netty框架本身支持拆包、解包,异常检测等机制,让开发人员从JavaNIO的细节中解脱,只需要关于业务的本身。
- Netty解决了JDK NIO的Bug(Epoll的BUG,会导致Selector空轮询,最终导致CPU100%)
- Netty框架内部对线程,Selector做了一些细节的优化,精心设计的Reactor多线程模型可以实现非常高效的并发处理
- Netty已经在多个项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到过充分验证。
RocketMQ中RPC通信的Netty多线程模型
RocketMQ的RPC通信部分采用了“1+N+M1+M2”的Reactor多线程模型,对网络通信部分进行一定的扩展与优化。
Netty的Reactor多线程模型设计概念与简述
Reactor多线程模型的设计思路是分而治之和事件驱动:
分而治之:一个网络请求连接的完整处理过程可以分为接受(accept),数据读取(read),解码/编码(decode/encode),业务处理(process),发送响应(send)这几个步骤。Reactor模型将每个步骤都映射为一个任务,服务端线程执行的最小逻辑单元不再是一次完成的网络请求,而是这个任务,且采用以非阻塞方式执行。
事件驱动:每个任务对应特定网络事件,当任务准备就绪时,Reactor收到对应的网络事件通知,并将任务分发给绑定了对应网络事件的Handler执行。
RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现
RocketMQ中RPC通信的Reactor多线程设计与流程
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图。
上图的框架图中可以大致了解RocketMQ中的NettyRemotingServer的Reactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的1)负责监听TCP网络连接请求,建立好连接后丢给Reactor线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认为3),它负责将建立好连接的socket注册到Selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据,拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认为8)。
为了更为高效的处理RPC的网络请求,这里的Worker线程池是专门用于处理Netty网络通信相关的(包括编码/解码,空闲链接管理、网络连接管理以及网络请求处理)。而处理业务操作放在业务线程中执行,根据RemotingCommand的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的“M2”)。
下面以表格的方式列举了下上面所述的“1+N+M1+M2”Reactor多线程模型
线程池 | 线程名 | 线程具体说明 |
1 | NettyBoss_%d | Reactor主线程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor线程池 |
M1 | NettyServerCodecThread_%d | Worker线程池 |
M2 | RemotingExecutorThread_%d | 业务processor处理线程池 |
RocketMQ中RPC通信的Reactor多线程的代码具体实现
在NettyRemotingServer的实例初始化时,会初始化各个相关的变量包括serverBootstrap、nettyServerConfig参数、channelEventListener监听器并同时初始化eventLoopGroupBoss和eventLoopGroupSelector两个Netty的EventLoopGroup线程池(如果Linux平台,并且开始了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll,否则,就用Java NIO的NioEventLoopGroup)。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;//省略部分代码//初始化时候nThreads设置为1,说明RemotingServer端的Disptacher链接管理和分发请求的线程为1,用于接收客户端的TCP连接this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));}});/*** 根据配置设置NIO还是Epoll来作为Selector线程池* 如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。* */if (useEpoll()) {this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}//省略部分代码
在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个Worker线程(defaultEventExecutorGroup)绑定上去。
Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到Tail的一个个Handler执行下去,这些Handler是在创建NettyRemotingServer实例时候指定的。NettyEncoder和NettyDecoder负责网络传输数据和RemotingCommand之间的编解码。NettyServerHandler拿到的解码得到的RemotingCommand后,根据RemotingCommand.type来判断是request还是response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理。
@Override
public void start() {//默认的处理线程池组,使用默认的处理线程池组用于处理后面的多个Netty Handler的逻辑操作this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});/*** 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,* 一个 Reactor 主线程负责监听 TCP 连接请求;* 建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector* 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据;* 拿到网络数据后,再丢给 Worker 线程池;**///RocketMQ-> Java NIO的1+N+M模型:1个acceptor线程,N个IO线程,M1个worker 线程。ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小.option(ChannelOption.SO_REUSEADDR, true)//这个参数表示允许重复使用本地地址和端口.option(ChannelOption.SO_KEEPALIVE, false)//当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。.childOption(ChannelOption.TCP_NODELAY, true)//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//这两个参数用于操作接收缓冲区和发送缓冲区.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,new HandshakeHandler(TlsSystemConfig.tlsMode)).addLast(defaultEventExecutorGroup,new NettyEncoder(),//rocketmq解码器,他们分别覆盖了父类的encode和decode方法new NettyDecoder(),//rocketmq编码器new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自带的心跳管理器new NettyConnectManageHandler(),//连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理。new NettyServerHandler()//当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 );}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}//定时扫描responseTable,获取返回结果,并且处理超时this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
从上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor线程池模型框图。
整体可以看出RocketMQ的RPC通信借助Netty的多线程模型,其服务端监听线程和IO线程分离,同时将RPC通信层的业务逻辑与处理具体业务的线程进一步相分离。时间可控的简单业务都直接放在RPC通信部分来完成,复杂和时间不可控的业务提交至后端业务线程池中处理,这样提高了通信效率和MQ整体的性能。(ps:其中抽象出NioEventLoop来表示一个不断循环执行处理任务的线程,每个NioEventLoop有一个Selector,用于监听绑定在其上的socket链路)。
安装参数
在安装过程中,先启动namesrv,后启动broker,broker中配置几个关键要素为:
- brokerClusterName:集群名称,所有broker配置的必须一致
- brokerName:broker名称,M-S之间必须一致,比如broker-a/broker-a(M/S),broker-b/broker-b
- brokerId:0表示Master,>0表示slave,比如1M2S,那么M是0,Slave是1,Slave2配置时2
- namesrvAddr:namesrv地址
- store*:持久化数据存储目录
- brokerRole:Broker的角色
- flushDiskType:刷盘方式,2m2s-sync中,master默认为SYNC_FLUSH,默认为ASYNC_FLUSH
启动相关
启动脚本./bin/mqbroker,实际作用是使用java命令运行org.apache.rocketmq.broker.BrokerStartup类。即broker的入口类是org.apache.rocketmq.broker.BrokerStartup。
如何保证消息的可靠性传输?要是消息丢失了怎么办?
生产阶段、存储阶段、消费阶段三个方面来讨论。
最终一致性:RocketMQ是一种最终一致性的分布式事务,就是说他保证的是消息最终一致性,而不是像2PC,3PC,TCC那样的强一致性分布式事务。
半消息:指暂时不能被Consumer消费的消息,Producer已经把消息成功发送到Broker端,但是此消息被标记暂不能投递,处于该种状态下的消息为半消息。需要Producer对消息的二次确认后,Consumer才能去消费他
消息回查:由于网络闪断、生产者应用重启等原因,导致Producer端一直没有对Half Message进行二次确认,Broker服务器会定时扫描长期处于半消息的信息,会主动询问Producer端该消息的最终状态(Commit或者Rollback),该消息为消息回查。
对于生产者来说,可以向多个master的broker去发送消息,同时可以发送同步消息和异步消息返回服务方消息的应答,保证消息是否发送成功,对于消息,有同步刷盘和异步刷盘机制,主从之间也有同步复制和异步复制,保证了消息不丢失。同时我们也可以把消息记录的日志文件或者表中,RocketMQ消息的存储是有ConsumerQueue和ConmmitLog配合完成,消息真正的物理存储文件是CommitLog,ConsumerQueue是消息的逻辑队列,类似于数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumerQueue文件,通过刷盘和复制机制来保证数据的高可用。
对于消费者来说,消费者既可以消费broker的master,有可以消费broker的slave,当master宕机之后,会自动切换的slave进行消费。对于广播的消息来说,我们可以进行消息的重试,消息队列RocketMQ默认允许每条消息最多重试16次,一条消息无论重试多少次,这些重试消息的MessageID不会改变。
当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确消费该消息,此时,消息队列RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Broker端保证消息可靠性,从架构层次来说明,常见的架构策略:
双主双从架构、NameServer多节点,同步双写,异步刷盘,消息在内存中,突然断电消息丢失,同步刷盘可靠性更高,消息持续化到磁盘,同城双活,异地多活,跨国多活。
Q&A
如何进行消息的重试机制?
Producer通过网络发送消息给Broker,当Broker收到之后,会返回确认响应信息给Producer,所以只要生产者接收到返回的确认响应,就代表消息在生产阶段未丢失。
消费端消费消息后,需要给Broker返回消费状态。Consumer端的重试机制包括两种情况:异常重试,由于Consumer端逻辑出现了异常,导致了返回了RECONSUMER_LATER状态,那么Broker就会在一段时间后重试;超时重试:如果Consumer端处理时间很长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为消费超时,发起超时重试。
消息模式为MessageModel.CLUSTERING集群模式下,Broker才会自动进行重试,广播消息时不会重试的。
并且这样的重试机制难免会出现消息重复消费的情况,所以在消费端需要做好幂等性。
如何保证消息不被重复消费?或者说如何保证消息消费时的幂等性?
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置