欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > RocketMq源码-broker(五)

RocketMq源码-broker(五)

2024/12/22 21:41:40 来源:https://blog.csdn.net/qq_40228720/article/details/144329004  浏览:    关键词:RocketMq源码-broker(五)

一、RocketMq存储设计

RocketMQ 主要存储的文件包括Commitlog 文件、ConsumeQueue 文件、IndexFile。RocketMQ 将所有主题的消息存储在同一文件,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。
但由于一般的消息中间件是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率, RocketMQ 引入了ConsumeQueue 消息队列文件,每个消息主题包含多个消息消费队列,每个消息队列有一个消息文件IndexFile 索引文件,其主要设计理念就是为了加速消息的检索性能,可以根据消息的属性快速从Commitlog 文件中检索消息。整体如下:

1 ) CommitLog :消息存储文件,所有消息主题的消息都存储在CommitLog 文件中
2 ) ConsumeQueue :消息消费队列,消息到达CommitLog 文件后,将异步转发到消息消费队列,供消息消费者消费
3 ) IndexFile :消息索引文件,主要存储消息Key 与Offset 的对应关系

rockeqmq中存储目录store如下:

一、消息存储结构

一、CommitLog

CommitLog 以物理文件的方式存放,每台Broker 上的CommitLog 被本机器所有ConsumeQueue 共享,文件地址:$ {user.home}\store\$ { commitlog} \ $ { fileName}。在CommitLog 中,一个消息的存储长度是不固定的, RocketMQ 采取一些机制,尽量向CommitLog 中顺序写,但是随机读。commitlog 文件默认大小为lG ,可通过在broker 置文件中设置mapedFileSizeCommitLog 属性来改变默认大小。

Commitlog 文件存储的逻辑视图如下,每条消息的前面4 个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

 二、ConsumeQueue

ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic 下的每个Message Queue 都有一个对应的ConsumeQueue 文件, 文件地址在${$storeRoot} \consumequeue\$ {topicName} \$ { queueld} \$ {fileName}。

ConsumeQueue 中存储的是消息条目,为了加速ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个Consumequeue 条目不会存储消息的全量信息,消息条目如下: 

ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是当消息到达Commitlog 文件后由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。

存储机制这样设计有以下几个好处:
1 ) CommitLog 顺序写,可以大大提高写入效率。(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方但是磁盘随机写的速度只有大概lOOKB/s,和顺序写的性能相差6000 倍!)
2 )虽然是随机读,但是利用操作系统的pagecache 机制,可以批量地从磁盘读取,作为cache 存到内存中,加速后续的读取速度。
3 )为了保证完全的顺序写,需要ConsumeQueue 这个中间结构,因为ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog 和ConsumeQueue的一致性, CommitLog 里存储了Consume Queues 、Message Key、Tag 等所有信息,即使ConsumeQueue 丢失,也可以通过commitLog 完全恢复出来。

三、IndexFile

index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ 专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度,使用Hash 索引机制,具体是Hash 槽与Hash 冲突的链表结构。(这里不做过多解释)


四、Config

config 文件夹中存储着Topic 和Consumer 等相关信息。主题和消费者群组相关的信息就存在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。例如下面就是TopicTest主题中四个队列的消费偏移量,(第0个队列偏移量5,第一个队列偏移量3,第二个偏移量4,第三个偏移量5)

{"offsetTable":{"TopicTest@please_rename_unique_group_name_4":{0:5,1:3,2:4,3:5},"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:20}}
}


consumerFilter.json :主题消息过滤信息。

五、内存映射

内存映射文件,是由一个文件到一块内存的映射。文件的数据就是这块区域内存中对应的数据,读写文件中的数据,直接对这块区域的地址操作就可以,减少了内存复制的环节。所以说,内存映射文件比起文件I/O 操作,效率要高,而且文件越大,体现出来的差距越大。
RocketMQ 通过使用内存映射文件来提高IO 访问性能,无论是CommitLog,ConsumeQueue 还是IndexFile ,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

六、文件刷盘机制

RocketMQ 存储与读写是基于JDK NIO 的内存映射机制,具体使用MappedByteBuffer(基于MappedByteBuffer 操作大文件的方式,其读写性能极高)。

RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息超出内存的限制RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写消息在通过Producer 写人RocketMQ 的时候,有两种写磁盘方式:

一、异步刷盘方式

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE ,写操作的返回快,吞吐量大;当内存里的消息积累到一定程度时,统一触发写磁盘动作,快速写入。

二、同步刷盘方式 

在返回写成功状态时,消息已经被写人磁盘具体流程是,消息写入内存的PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态消息存储时首先将消息追加到内存,再根据配值的刷盘略在不同时间进行刷写磁盘如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer force()方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端RocketMQ 使用个单独的线程按照某个设定的频执行刷盘操作。通过在broker 配置文件中配置flushDiskType 来设定刷盘方式,可选值为ASYNC_FLUSH (异步刷盘), SYNC_FLUSH 同步刷盘) 默认为异步。

实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作, 会明显降低性能。通常情况下,应该把Rocket置成异步刷盘方式。

七、过期文件删除

由于RocketMQ 操作CommitLog,ConsumeQueue 文件是基于内存映射机制并在启动的时候会加载commitlog,ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件( Commitlog )与消息消费队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件( Commitlog )共用一套过期文件机制。

RocketMQ 清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为42 小时(不同版本的默认值不同,这里以4.4.0 为例) ,通过在Broker 配置文件中设置fileReservedTime 来改变过期时间,单位为小时。
触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s 执行一次。

一、过期判断

文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。
另外还有其他两个配置参数:
deletePhysicFilesInterval:删除物理文件的时间间隔(默认是100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO 的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用小于等于0 为止,即可删除该文件.

二、删除条件

1)指定删除文件的时间点, RocketMQ 通过deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨4 点。
2)磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85),会触发过期文件删除操作。
另外还有RocketMQ 的磁盘配置参数:
1:物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。
2:物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%) 表示磁盘使用正常。

二、源码流程

一、启动流程BrokerController#start

Broker 最核心的功能就是接收到消息然后把消息进行存储,所以最核心的线程就是messageStore

if (this.messageStore != null) {this.messageStore.start();
}

 启动commitLog线程,负责将消息写入到commingLog文件中

this.commitLog.start();
class FlushRealTimeService extends FlushCommitLogService {private long lastFlushTimestamp = 0;private long printTimes = 0;public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {if (flushCommitLogTimed) {Thread.sleep(interval);} else {this.waitForRunning(interval);}if (printFlushProgress) {this.printFlushProgress();}long begin = System.currentTimeMillis();CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// Normal shutdown, to ensure that all the flush before exitboolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}
}

启动reputMessageService线程,RocketMQ 通过开启一个线程ReputMessageService 来监听CommitLog 文件更新事件,如果有新的消息,则及时更新ComsumeQueue、IndexFile 文件。

this.reputMessageService.start();

 启动netty服务,与nameserver,broker端进行通信

if (this.remotingServer != null) {this.remotingServer.start();
}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();
}

 当broker端没有消息的时候,会将consumer端发起的拉取消息请求暂存在map中,就是这个线程做的处理。

if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();
}

启动定时线程注册broker信息到nameserver 

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}
}, 1000 * 200, Math.max(180000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

二、Broker中消息的生产

发送的消息到达broker,调用org.apache.rocketmq.broker.processor.SendMessageProcessor 类的processRequest()方法,processRequest()调用sendMessag()

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

1.非批次发送消息sendMessag()
2.消息存储(注意,这里都只存储commitlog),调用DefaultMessageStore.putMessage()方法。
2.1 这里进行commitlog 的提交,调用CommitLog.putMessage()。

CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

2.1.1 在MappedFile 类中,处理存储都是使用MappedFile 这个类进行处理的,最终调用appendMessage 方法。appendMessagesInner 方法中,这里进行文件的追加(AppendMessageCallback 接口的实现DefaultAppendMessageCallback 在CommitLog类中,是一个内部类)。
2.2 在commitlog 类中doAppend 方法中进行commitlog 的处理,还是基于byteBuffer 的操作。
2.3 在commitlog 类中doAppend 方法中进行返回,将消息写入的内存的位置信息、写入耗时封装为AppendMessageResult 对象返回。

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}

 提交刷盘请求,最终将内存中的commitLog数据刷到磁盘中

CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

总结:
RocketMQ 接收到消息后,首先是写入Commitlog 文件,按照顺序进行写入,使用NIO 技术。在Commitlog 中putMessage 最后通过判断配置文件的主从同步类型和刷盘类型,进行刷盘。

借助java NIO 的力量,使得I/O 性能十分高。当消息来的时候,顺序写入CommitLog。
RocketMQ 下主要有三类大文件:commitlog 文件、Index 文件,consumequeue 文件,对于三类大文件,使用的就是NIO 的MappedByteBuffer 类来提高读写性能(主要是刷盘方法中)。这个类是文件内存映射的相关类,支持随机读和顺序写。在RocketMQ 中,被封装成了MappedFile 类

三、Broker中更新消息队列和索引文件

消息进入Commitlog 文件还不够,因为对于消费者来说,他们必须要看到ConsumeQueue 和IndexFile(ConsumeQueue 是因为消费要根据队列进行消费,另外没有索引文件IndexFile,消息的查找会出现很大的延迟)。
所以RocketMQ 通过开启一个线程ReputMessageService 来监听CommitLog 文件更新事件,如果有新的消息,则及时更新ComsumeQueue、IndexFile 文件。
源码跟踪
1. DefaultMessageStore 类中的内部类ReputMessageService 专门处理此项任务
2. ReputMessageService 类的run(),默认1 毫秒处理一次(文件从CommitLog 到ComsumeQueue 和Index)
3. ReputMessageService 类的doReput()方法。
4. ReputMessageService 类的doReput()方法中, doDispatch,最终会构建(构建消息消费队)和(构建索引文件)。

构建ConsumeQueue文件 

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());cq.putMessagePositionInfoWrapper(dispatchRequest);
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset + size;return mappedFile.appendMessage(this.byteBufferIndex.array());}return false;
}

consumeQueue中刷盘线程启动,将创建好的consumeQueue刷新到磁盘中。

this.flushConsumeQueueService.start();
class FlushConsumeQueueService extends ServiceThread {private static final int RETRY_TIMES_OVER = 3;private long lastFlushTimestamp = 0;private void doFlush(int retryTimes) {int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();if (retryTimes == RETRY_TIMES_OVER) {flushConsumeQueueLeastPages = 0;}long logicsMsgTimestamp = 0;int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushConsumeQueueLeastPages = 0;logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();}ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue cq : maps.values()) {boolean result = false;for (int i = 0; i < retryTimes && !result; i++) {result = cq.flush(flushConsumeQueueLeastPages);}}}if (0 == flushConsumeQueueLeastPages) {if (logicsMsgTimestamp > 0) {DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);}DefaultMessageStore.this.getStoreCheckpoint().flush();}}public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();this.waitForRunning(interval);this.doFlush(1);} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}this.doFlush(RETRY_TIMES_OVER);DefaultMessageStore.log.info(this.getServiceName() + " service end");}
}

核心关键点:
定时任务来处理的消息存储转换。处理核心类是DefaultMessageStore 类中的内部类ReputMessageService

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com