RocketMQ 消费者源码分析
前文
RocketMQ源码分析(二) 生产者
RocketMQ源码分析(一)broker启动&remoting抽象
1. 概述
RocketMQ 的消费者主要分为以下类型:
- 推模式消费者(DefaultMQPushConsumer)
- 拉模式消费者(DefaultMQPullConsumer)
- 推模式消费者(Lite版本)
本文将从源码层面分析消费者的实现原理。
2. 核心类图
@startuml consumer_class_diagram
!pragma layout smetanainterface MQConsumer {+start()+shutdown()+registerMessageListener()
}class DefaultMQPushConsumer {-DefaultMQPushConsumerImpl consumerImpl-MessageListener messageListener+subscribe()+start()
}class DefaultMQPullConsumer {-DefaultMQPullConsumerImpl consumerImpl+pull()+start()
}class RebalanceService {-MQClientInstance mqClientFactory+doRebalance()+start()
}class ConsumeMessageService {+consumeMessage()+start()
}MQConsumer <|.. DefaultMQPushConsumer
MQConsumer <|.. DefaultMQPullConsumer
DefaultMQPushConsumer -- RebalanceService
DefaultMQPushConsumer -- ConsumeMessageService
@enduml
主要类及其关系:
-
MQConsumer(接口)
- 定义了消费者的核心方法
- start()
- shutdown()
- registerMessageListener()
-
DefaultMQPushConsumer
- 包含DefaultMQPushConsumerImpl实例
- 实现消息推模式消费
- 支持并发/顺序消费
-
DefaultMQPullConsumer
- 包含DefaultMQPullConsumerImpl实例
- 实现消息拉模式消费
- 需要用户自己维护消费位点
-
RebalanceService
- 负责消费者负载均衡
- 分配消息队列
- 触发重平衡
3. 消费者启动流程
@startuml consumer_startup_sequence
!pragma layout smetanaparticipant DefaultMQPushConsumer
participant DefaultMQPushConsumerImpl
participant MQClientInstance
participant RebalanceServiceDefaultMQPushConsumer -> DefaultMQPushConsumerImpl: start()
activate DefaultMQPushConsumerImplDefaultMQPushConsumerImpl -> DefaultMQPushConsumerImpl: checkConfig()
DefaultMQPushConsumerImpl -> MQClientInstance: start()
activate MQClientInstanceMQClientInstance -> RebalanceService: start()
activate RebalanceServiceRebalanceService -> RebalanceService: doRebalance()
RebalanceService --> MQClientInstance
deactivate RebalanceServiceMQClientInstance --> DefaultMQPushConsumerImpl
deactivate MQClientInstanceDefaultMQPushConsumerImpl --> DefaultMQPushConsumer
deactivate DefaultMQPushConsumerImpl
@enduml
启动流程包含以下步骤:
- 检查配置参数
- 初始化MQClientInstance
- 注册消费者到MQClientInstance
- 启动负载均衡服务
- 启动消费服务
4. 消息消费模式
4.1 推模式(Push)
@startuml consumer_push_sequence
!pragma layout smetanaparticipant Broker
participant PushConsumer
participant ConsumeMessageService
participant MessageListenerBroker -> PushConsumer: pullMessage()
activate PushConsumerPushConsumer -> ConsumeMessageService: consumeMessage()
activate ConsumeMessageServiceConsumeMessageService -> MessageListener: consumeMessage()
activate MessageListenerMessageListener --> ConsumeMessageService: ConsumeConcurrentlyStatus
deactivate MessageListenerConsumeMessageService --> PushConsumer
deactivate ConsumeMessageServicePushConsumer -> Broker: updateOffset()
PushConsumer --> Broker
deactivate PushConsumer
@enduml
推模式特点:
- 由Broker主动推送消息到Consumer
- 实际基于长轮询实现
- 实时性较好
- 支持并发消费和顺序消费
4.2 拉模式(Pull)
@startuml consumer_pull_sequence
!pragma layout smetanaparticipant PullConsumer
participant Broker
participant ProcessQueuePullConsumer -> Broker: pullMessage()
activate BrokerBroker --> PullConsumer: PullResult
deactivate BrokerPullConsumer -> ProcessQueue: putMessage()
activate ProcessQueueProcessQueue -> ProcessQueue: processMessage()
ProcessQueue --> PullConsumer
deactivate ProcessQueuePullConsumer -> Broker: updateOffset()
@enduml
拉模式特点:
- 由Consumer主动拉取消息
- 更灵活但使用复杂
- 需要自己维护消费位点
- 适合特殊场景使用
5. 负载均衡
5.1 重平衡流程
@startuml consumer_rebalance_sequence
!pragma layout smetanaparticipant RebalanceService
participant MQClientInstance
participant BrokerRebalanceService -> MQClientInstance: doRebalance()
activate MQClientInstanceMQClientInstance -> Broker: getConsumerList()
activate BrokerBroker --> MQClientInstance: ConsumerList
deactivate BrokerMQClientInstance -> MQClientInstance: allocateMessageQueue()
MQClientInstance -> MQClientInstance: updateProcessQueue()MQClientInstance --> RebalanceService
deactivate MQClientInstance
@enduml
重平衡触发时机:
- Consumer数量变化
- Topic队列数变化
- Consumer订阅关系变化
5.2 负载均衡策略
RocketMQ提供多种分配策略:
- 平均分配策略(AVG)
- 环形分配策略(CIRCLE)
- 机房分配策略(IDC)
- 一致性哈希策略(CONSISTENT_HASH)
6. 消费模式
6.1 并发消费
特点:
- 多线程并行消费消息
- 消费线程池可配置
- 适合无顺序要求场景
- 性能较好
@startuml consumer_concurrent_sequence
!pragma layout smetanaparticipant PushConsumer
participant ConsumeMessageService
participant "ConsumerThread 1" as Thread1
participant "ConsumerThread 2" as Thread2
participant MessageListenerPushConsumer -> ConsumeMessageService: submitConsumeRequest()
activate ConsumeMessageServiceConsumeMessageService -> Thread1: consumeMessage(msg1)
ConsumeMessageService -> Thread2: consumeMessage(msg2)
activate Thread1
activate Thread2Thread1 -> MessageListener: consumeMessage(msg1)
Thread2 -> MessageListener: consumeMessage(msg2)MessageListener --> Thread1: ConsumeConcurrentlyStatus
MessageListener --> Thread2: ConsumeConcurrentlyStatusThread1 --> ConsumeMessageService
Thread2 --> ConsumeMessageService
deactivate Thread1
deactivate Thread2ConsumeMessageService -> PushConsumer: updateOffset()
deactivate ConsumeMessageService@enduml
并发消费流程:
- ConsumeMessageService将消息提交到消费线程池
- 多个消费线程并行处理消息
- 每个线程独立调用MessageListener处理消息
- 处理完成后批量更新消费位点
实现原理
MessageListenerConcurrently 的核心实现:
public interface MessageListenerConcurrently extends MessageListener {ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
}
并发消费的处理流程:
-
消息拉取
- PullMessageService 从 Broker 拉取消息
- 将消息放入 ProcessQueue 进行处理
-
消息分发
- ConsumeMessageConcurrentlyService 维护消费线程池
- 将消息批量提交到线程池并发处理
- 默认线程池大小:最小20,最大64,队列长度10000
-
消息消费
- 多个线程并行调用 consumeMessage 处理消息
- 每批消息默认最大1条(可配置)
- 返回 CONSUME_SUCCESS 或 RECONSUME_LATER
-
消费进度更新
- 并发更新消费位点到 Broker
- 异步提交,提高性能
示例代码:
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 并发处理消息for (MessageExt msg : msgs) {// 处理业务逻辑}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
6.2 顺序消费
特点:
- 单线程顺序消费消息
- 保证消息消费顺序
- 适合有顺序要求场景
- 性能相对较低
@startuml consumer_orderly_sequence
!pragma layout smetanaparticipant PushConsumer
participant ConsumeMessageService
participant ProcessQueue
participant MessageListenerPushConsumer -> ConsumeMessageService: submitConsumeRequest()
activate ConsumeMessageServiceConsumeMessageService -> ProcessQueue: lockMQQueue()
activate ProcessQueueProcessQueue -> MessageListener: consumeMessage(msg1)
activate MessageListener
MessageListener --> ProcessQueue: ConsumeOrderlyStatus
deactivate MessageListenerProcessQueue -> MessageListener: consumeMessage(msg2)
activate MessageListener
MessageListener --> ProcessQueue: ConsumeOrderlyStatus
deactivate MessageListenerProcessQueue -> ProcessQueue: updateOffset()
ProcessQueue --> ConsumeMessageService: unlock()
deactivate ProcessQueueConsumeMessageService --> PushConsumer
deactivate ConsumeMessageService@enduml
顺序消费流程:
- 对消息队列加锁,确保同一时刻只有一个线程消费
- 按消息顺序依次调用MessageListener处理
- 处理完一批消息后更新消费位点
- 释放队列锁,允许下一次顺序消费
实现原理
MessageListenerOrderly 的核心实现:
public interface MessageListenerOrderly extends MessageListener {ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context);
}
顺序消费的处理流程:
-
消息队列锁定
- 在消费之前获取 MessageQueue 级别的锁
- 确保同一个队列只会被一个线程处理
- 分布式锁实现,通过 Broker 控制
-
消息排序
- ProcessQueue 内部使用 TreeMap 保证消息顺序
- 按照消息队列偏移量排序
- 确保消息按照存储顺序消费
-
消息消费
- 单线程按顺序处理同一队列的消息
- 每批消息默认最大1条(可配置)
- 返回 SUCCESS 或 SUSPEND_CURRENT_QUEUE_A_MOMENT
-
消费进度更新
- 同步更新消费位点到 Broker
- 确保顺序消费的可靠性
示例代码:
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {// 顺序处理消息for (MessageExt msg : msgs) {// 处理业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}
});
关键区别对比
特性 | 并发消费 | 顺序消费 |
---|---|---|
消费线程 | 多线程并行 | 单线程串行 |
消息顺序 | 不保证顺序 | 严格顺序 |
消费性能 | 较高 | 较低 |
锁机制 | 无锁 | 队列锁 |
消费状态 | CONSUME_SUCCESS/RECONSUME_LATER | SUCCESS/SUSPEND |
位点提交 | 异步批量 | 同步实时 |
适用场景 | 一般消息 | 顺序敏感消息 |
7. 核心实现类功能说明
- DefaultMQPushConsumer: 推模式消费者实现
- DefaultMQPullConsumer: 拉模式消费者实现
- RebalanceService: 负载均衡服务
- ConsumeMessageService: 消息消费服务
- ProcessQueue: 消息处理队列
- MessageListener: 消息监听器接口
8. 总结
RocketMQ消费者的特点:
- 支持推拉两种消费模式
- 完善的负载均衡机制
- 灵活的消费模式(并发/顺序)
- 高可靠的消息消费保证
通过分层设计和策略模式,使得消费者架构具有很好的扩展性和灵活性。