欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > RocketMQ 主题与队列的协同作用解析(既然队列存储在不同的集群中,那要主题有什么用呢?)---管理命令、配置安装

RocketMQ 主题与队列的协同作用解析(既然队列存储在不同的集群中,那要主题有什么用呢?)---管理命令、配置安装

2025/4/26 9:35:31 来源:https://blog.csdn.net/m0_74077663/article/details/147519613  浏览:    关键词:RocketMQ 主题与队列的协同作用解析(既然队列存储在不同的集群中,那要主题有什么用呢?)---管理命令、配置安装

 学习之前呢需要会使用linux的基础命令

一.RocketMQ 主题与队列的协同作用解析

在 RocketMQ 中,‌主题(Topic)‌与‌队列(Queue)‌的协同设计实现了消息系统的逻辑抽象与物理存储分离。虽然队列实际存储在不同集群的 Broker 节点上,但主题作为逻辑层面的核心单元,仍具有不可替代的作用:

1. ‌主题是消息的逻辑分类与路由标识
  • 逻辑聚合‌:主题将同一类业务消息聚合为逻辑单元(例如订单消息归入 OrderTopic),生产者只需关注发送到哪个主题,无需感知底层队列的物理分布。
  • 路由依据‌:消息通过主题名称路由到对应集群的队列中。例如,生产者发送到 PaymentTopic 的消息会被自动分配到该主题关联的队列(可能分布在多个集群)。
  • 跨集群协同‌:若队列分布在多个集群,主题的元数据(如队列分布规则)由 NameServer 统一管理,确保生产者/消费者能准确找到目标队列。

2. ‌主题实现消息的订阅与消费隔离
  • 订阅关系管理‌:消费者通过订阅主题来接收消息,而非直接绑定队列。即使队列分布在多个集群,消费者组仍可通过主题名称统一订阅,RocketMQ 自动完成队列分配和负载均衡。
  • 权限控制‌:主题支持独立的权限配置(如读写权限),实现不同业务消息的访问隔离。例如,财务系统仅能访问 FinanceTopic,与订单系统隔离。
  • Tag 过滤扩展‌:在主题基础上,通过 Tag 进一步细分消息类型(如 OrderTopic:TagA),消费者可选择性订阅特定 Tag,减少无关消息处理。

3. ‌主题为运维提供统一管理入口
  • 队列动态扩展‌:当集群扩容时,通过调整主题的队列数量(如从 8 队列增至 16 队列),新队列可自动分配到新增集群的 Broker,无需修改业务代码。
  • 监控与告警‌:主题级别的监控指标(如消息堆积量、消费 TPS)便于快速定位问题。例如,InventoryTopic 的消费延迟告警可直接关联到库存业务。
  • 数据生命周期‌:主题支持独立的消息保留策略(如保存 3 天),不同业务可按需配置,避免全局策略的局限性。
4. ‌主题与队列的协同模型
维度主题(Topic)队列(Queue)
定位逻辑分类单元,定义消息身份与权限物理存储单元,实际承载消息数据
扩展性通过增加队列数量实现横向扩展依赖 Broker 集群扩容提升单队列容量
消费模式支持集群消费(单队列单消费者)或广播消费仅作为存储载体,不直接决定消费模式
运维操作配置权限、Tag 规则、保留策略等调整副本数、存储路径、故障迁移等

实际场景示例

  • 场景‌:电商系统的订单消息(OrderTopic)需要支持每秒 10 万条吞吐量。
  • 实现‌:
    1. 创建 OrderTopic 并设置 32 个队列。
    2. 将队列分布在 4 个集群的 Broker 节点上(每个集群 8 队列)。
    3. 消费者组订阅 OrderTopic,RocketMQ 自动将 32 个队列均分给消费者实例。
    4. 当某个集群故障时,NameServer 将故障队列的路由指向其他集群的副本队列,保障高可用

小结

主题的核心价值在于‌逻辑抽象‌,它屏蔽了底层队列的物理复杂性,同时提供业务层面的统一管理入口。队列的分布式存储解决的是‌性能与可靠性问题‌,而主题则定义了消息的‌业务语义与协作规则‌,两者共同构成 RocketMQ 高并发、高可用的基石。

二.RocketMQ管理命令 

1、updateTopic

作用:修改或创建一个Topic

命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息
  • -o: 设置topic是否为有序的 取值:true、false(默认)
  • -p: 设置topic的权限

示例: 

 mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic

作用:从broker和nameserver删除topic 

命令:mqadmin deleteTopic -c [-h] [-n ] -t

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息

示例

mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic

3、topicList

作用:从nameserver列出所有topic

命令:mqadmin topicList [-c] [-h] [-n ]

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -h: 打印help信息

示例

mqadmin topicList -n localhost:9876

4、topicStatus

作用:检查topic的状态信息

命令:mqadmin topicStatus [-h] [-n ] -t

参数:

  • -n: name server地址列表
  • -c: cluster 名称,表示topic 建在该集群
  • -t: 设置topic名称
  • -h: 打印help信息

 示例

mqadmin topicStatus -n localhost:9876 -t testtopic

5、cleanUnusedTopic

作用:清理未使用的topic

命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]

参数:

  • -n: name server地址列表
  • -b: broker地址
  • -c: 集群名称
  • -h: 打印help信息

6关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

三.RocketMQ安装与配置 

1、首先修改环境变量 

vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效环境变量
source /etc/profile

2、解压文件

unzip rocketmq-all-5.1.4-bin-release.zip

我们上传安装包至opt目录下,使用该命令将其解压至opt目录下,最后使用mv命令对其进行位置移动和改名-----/usr/local/rocketmq

# 解压到当前目录(/opt)
unzip rocketmq-all-5.0.0-bin-release.zip# 将解压后的目录移动到/usr/local
mv rocketmq-all-5.0.0-bin-release /usr/local/# 改名
mv rocketmq-all-5.0.0-bin-release rocketmq

 进入bin目录如下

3、启动NameServer 

nohup sh mqnamesrv &

 

4、启动broker 

nohup sh ./mqbroker -n localhost:9876 & 

使用jps命令查看后发现broker未启动

启动成功后的输出结果 

使用 cat nohup.out 查看日志如下

 很常见的问题

1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module 

解决:添加启动参数

修改runbroker文件,添加红色参数 $JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@

2、堆空间初始值太大也报错 

修改文件runbroker.sh(将光标位置修改为如下数字即可)

通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,修改完成后便可以正常启动以及查看日志。 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词