欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > RocketMQ源码分析(三) 消费者

RocketMQ源码分析(三) 消费者

2025/2/22 2:08:50 来源:https://blog.csdn.net/caicongyang/article/details/144358201  浏览:    关键词:RocketMQ源码分析(三) 消费者

RocketMQ 消费者源码分析

前文

RocketMQ源码分析(二) 生产者
RocketMQ源码分析(一)broker启动&remoting抽象

1. 概述

RocketMQ 的消费者主要分为以下类型:

  • 推模式消费者(DefaultMQPushConsumer)
  • 拉模式消费者(DefaultMQPullConsumer)
  • 推模式消费者(Lite版本)

本文将从源码层面分析消费者的实现原理。

2. 核心类图

在这里插入图片描述

@startuml consumer_class_diagram
!pragma layout smetanainterface MQConsumer {+start()+shutdown()+registerMessageListener()
}class DefaultMQPushConsumer {-DefaultMQPushConsumerImpl consumerImpl-MessageListener messageListener+subscribe()+start()
}class DefaultMQPullConsumer {-DefaultMQPullConsumerImpl consumerImpl+pull()+start()
}class RebalanceService {-MQClientInstance mqClientFactory+doRebalance()+start()
}class ConsumeMessageService {+consumeMessage()+start()
}MQConsumer <|.. DefaultMQPushConsumer
MQConsumer <|.. DefaultMQPullConsumer
DefaultMQPushConsumer -- RebalanceService
DefaultMQPushConsumer -- ConsumeMessageService
@enduml 

主要类及其关系:

  1. MQConsumer(接口)

    • 定义了消费者的核心方法
    • start()
    • shutdown()
    • registerMessageListener()
  2. DefaultMQPushConsumer

    • 包含DefaultMQPushConsumerImpl实例
    • 实现消息推模式消费
    • 支持并发/顺序消费
  3. DefaultMQPullConsumer

    • 包含DefaultMQPullConsumerImpl实例
    • 实现消息拉模式消费
    • 需要用户自己维护消费位点
  4. RebalanceService

    • 负责消费者负载均衡
    • 分配消息队列
    • 触发重平衡

3. 消费者启动流程

在这里插入图片描述

@startuml consumer_startup_sequence
!pragma layout smetanaparticipant DefaultMQPushConsumer
participant DefaultMQPushConsumerImpl
participant MQClientInstance
participant RebalanceServiceDefaultMQPushConsumer -> DefaultMQPushConsumerImpl: start()
activate DefaultMQPushConsumerImplDefaultMQPushConsumerImpl -> DefaultMQPushConsumerImpl: checkConfig()
DefaultMQPushConsumerImpl -> MQClientInstance: start()
activate MQClientInstanceMQClientInstance -> RebalanceService: start()
activate RebalanceServiceRebalanceService -> RebalanceService: doRebalance()
RebalanceService --> MQClientInstance
deactivate RebalanceServiceMQClientInstance --> DefaultMQPushConsumerImpl
deactivate MQClientInstanceDefaultMQPushConsumerImpl --> DefaultMQPushConsumer
deactivate DefaultMQPushConsumerImpl
@enduml 

启动流程包含以下步骤:

  1. 检查配置参数
  2. 初始化MQClientInstance
  3. 注册消费者到MQClientInstance
  4. 启动负载均衡服务
  5. 启动消费服务

4. 消息消费模式

4.1 推模式(Push)

在这里插入图片描述

@startuml consumer_push_sequence
!pragma layout smetanaparticipant Broker
participant PushConsumer
participant ConsumeMessageService
participant MessageListenerBroker -> PushConsumer: pullMessage()
activate PushConsumerPushConsumer -> ConsumeMessageService: consumeMessage()
activate ConsumeMessageServiceConsumeMessageService -> MessageListener: consumeMessage()
activate MessageListenerMessageListener --> ConsumeMessageService: ConsumeConcurrentlyStatus
deactivate MessageListenerConsumeMessageService --> PushConsumer
deactivate ConsumeMessageServicePushConsumer -> Broker: updateOffset()
PushConsumer --> Broker
deactivate PushConsumer
@enduml 

推模式特点:

  • 由Broker主动推送消息到Consumer
  • 实际基于长轮询实现
  • 实时性较好
  • 支持并发消费和顺序消费

4.2 拉模式(Pull)

在这里插入图片描述

@startuml consumer_pull_sequence
!pragma layout smetanaparticipant PullConsumer
participant Broker
participant ProcessQueuePullConsumer -> Broker: pullMessage()
activate BrokerBroker --> PullConsumer: PullResult
deactivate BrokerPullConsumer -> ProcessQueue: putMessage()
activate ProcessQueueProcessQueue -> ProcessQueue: processMessage()
ProcessQueue --> PullConsumer
deactivate ProcessQueuePullConsumer -> Broker: updateOffset()
@enduml

拉模式特点:

  • 由Consumer主动拉取消息
  • 更灵活但使用复杂
  • 需要自己维护消费位点
  • 适合特殊场景使用

5. 负载均衡

5.1 重平衡流程

在这里插入图片描述

@startuml consumer_rebalance_sequence
!pragma layout smetanaparticipant RebalanceService
participant MQClientInstance
participant BrokerRebalanceService -> MQClientInstance: doRebalance()
activate MQClientInstanceMQClientInstance -> Broker: getConsumerList()
activate BrokerBroker --> MQClientInstance: ConsumerList
deactivate BrokerMQClientInstance -> MQClientInstance: allocateMessageQueue()
MQClientInstance -> MQClientInstance: updateProcessQueue()MQClientInstance --> RebalanceService
deactivate MQClientInstance
@enduml 

重平衡触发时机:

  • Consumer数量变化
  • Topic队列数变化
  • Consumer订阅关系变化

5.2 负载均衡策略

RocketMQ提供多种分配策略:

  1. 平均分配策略(AVG)
  2. 环形分配策略(CIRCLE)
  3. 机房分配策略(IDC)
  4. 一致性哈希策略(CONSISTENT_HASH)

6. 消费模式

6.1 并发消费

特点:

  • 多线程并行消费消息
  • 消费线程池可配置
  • 适合无顺序要求场景
  • 性能较好

在这里插入图片描述

@startuml consumer_concurrent_sequence
!pragma layout smetanaparticipant PushConsumer
participant ConsumeMessageService
participant "ConsumerThread 1" as Thread1
participant "ConsumerThread 2" as Thread2
participant MessageListenerPushConsumer -> ConsumeMessageService: submitConsumeRequest()
activate ConsumeMessageServiceConsumeMessageService -> Thread1: consumeMessage(msg1)
ConsumeMessageService -> Thread2: consumeMessage(msg2)
activate Thread1
activate Thread2Thread1 -> MessageListener: consumeMessage(msg1)
Thread2 -> MessageListener: consumeMessage(msg2)MessageListener --> Thread1: ConsumeConcurrentlyStatus
MessageListener --> Thread2: ConsumeConcurrentlyStatusThread1 --> ConsumeMessageService
Thread2 --> ConsumeMessageService
deactivate Thread1
deactivate Thread2ConsumeMessageService -> PushConsumer: updateOffset()
deactivate ConsumeMessageService@enduml 

并发消费流程:

  1. ConsumeMessageService将消息提交到消费线程池
  2. 多个消费线程并行处理消息
  3. 每个线程独立调用MessageListener处理消息
  4. 处理完成后批量更新消费位点
实现原理

MessageListenerConcurrently 的核心实现:

public interface MessageListenerConcurrently extends MessageListener {ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
}

并发消费的处理流程:

  1. 消息拉取

    • PullMessageService 从 Broker 拉取消息
    • 将消息放入 ProcessQueue 进行处理
  2. 消息分发

    • ConsumeMessageConcurrentlyService 维护消费线程池
    • 将消息批量提交到线程池并发处理
    • 默认线程池大小:最小20,最大64,队列长度10000
  3. 消息消费

    • 多个线程并行调用 consumeMessage 处理消息
    • 每批消息默认最大1条(可配置)
    • 返回 CONSUME_SUCCESS 或 RECONSUME_LATER
  4. 消费进度更新

    • 并发更新消费位点到 Broker
    • 异步提交,提高性能

示例代码:

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 并发处理消息for (MessageExt msg : msgs) {// 处理业务逻辑}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});

6.2 顺序消费

特点:

  • 单线程顺序消费消息
  • 保证消息消费顺序
  • 适合有顺序要求场景
  • 性能相对较低

在这里插入图片描述

@startuml consumer_orderly_sequence
!pragma layout smetanaparticipant PushConsumer
participant ConsumeMessageService
participant ProcessQueue
participant MessageListenerPushConsumer -> ConsumeMessageService: submitConsumeRequest()
activate ConsumeMessageServiceConsumeMessageService -> ProcessQueue: lockMQQueue()
activate ProcessQueueProcessQueue -> MessageListener: consumeMessage(msg1)
activate MessageListener
MessageListener --> ProcessQueue: ConsumeOrderlyStatus
deactivate MessageListenerProcessQueue -> MessageListener: consumeMessage(msg2)
activate MessageListener
MessageListener --> ProcessQueue: ConsumeOrderlyStatus
deactivate MessageListenerProcessQueue -> ProcessQueue: updateOffset()
ProcessQueue --> ConsumeMessageService: unlock()
deactivate ProcessQueueConsumeMessageService --> PushConsumer
deactivate ConsumeMessageService@enduml 

顺序消费流程:

  1. 对消息队列加锁,确保同一时刻只有一个线程消费
  2. 按消息顺序依次调用MessageListener处理
  3. 处理完一批消息后更新消费位点
  4. 释放队列锁,允许下一次顺序消费
实现原理

MessageListenerOrderly 的核心实现:

public interface MessageListenerOrderly extends MessageListener {ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context);
}

顺序消费的处理流程:

  1. 消息队列锁定

    • 在消费之前获取 MessageQueue 级别的锁
    • 确保同一个队列只会被一个线程处理
    • 分布式锁实现,通过 Broker 控制
  2. 消息排序

    • ProcessQueue 内部使用 TreeMap 保证消息顺序
    • 按照消息队列偏移量排序
    • 确保消息按照存储顺序消费
  3. 消息消费

    • 单线程按顺序处理同一队列的消息
    • 每批消息默认最大1条(可配置)
    • 返回 SUCCESS 或 SUSPEND_CURRENT_QUEUE_A_MOMENT
  4. 消费进度更新

    • 同步更新消费位点到 Broker
    • 确保顺序消费的可靠性

示例代码:

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {// 顺序处理消息for (MessageExt msg : msgs) {// 处理业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}
});
关键区别对比
特性并发消费顺序消费
消费线程多线程并行单线程串行
消息顺序不保证顺序严格顺序
消费性能较高较低
锁机制无锁队列锁
消费状态CONSUME_SUCCESS/RECONSUME_LATERSUCCESS/SUSPEND
位点提交异步批量同步实时
适用场景一般消息顺序敏感消息

7. 核心实现类功能说明

  • DefaultMQPushConsumer: 推模式消费者实现
  • DefaultMQPullConsumer: 拉模式消费者实现
  • RebalanceService: 负载均衡服务
  • ConsumeMessageService: 消息消费服务
  • ProcessQueue: 消息处理队列
  • MessageListener: 消息监听器接口

8. 总结

RocketMQ消费者的特点:

  1. 支持推拉两种消费模式
  2. 完善的负载均衡机制
  3. 灵活的消费模式(并发/顺序)
  4. 高可靠的消息消费保证

通过分层设计和策略模式,使得消费者架构具有很好的扩展性和灵活性。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词