1.前言
NameServer是RocketMQ中的一个比较重要的组件,我们这篇博客针对NameSever中包含的组件进行分析,分析一下NameServer中包含的组件以及组件的作用。以前我有一篇博客中rocketMq源码分析之搭建本地环境-CSDN博客,在这篇博客中就简单看了下NameSever中会有两个组件:NamesrvConfig和NettyServerConfig。在这里就不在进行介绍。
2.KVConfigManager
这个KVConfigManager的核心作用是提供一个统一的地方来存储和管理一些全局的配置信息,这些配置信息以键值对的形式存在,不同的业务模块或者不同的命名空间可以使用这些配置来完成特定的任务。
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =new HashMap<String, HashMap<String, String>>();
里面存放了一个HashMap用来存放一个一个键值对。主要包含putKVConfig方法,getKVConfig方法以及persist(持久化到磁盘)方法。
3.RouteInfoManager
这个RouteInfoManager是NameServer中最重要的一个组件,主要负责管理和维护整个 RocketMQ 集群的路由信息,核心作用是维护 RocketMQ 集群中各种组件(如 Broker、Topic 等)的路由元数据信息,使得生产者和消费者能够准确地找到所需的消息队列,从而实现消息的正确发送和消费。
下面是RouteInfoManager中的主要数据结构:
public class RouteInfoManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);// broker长连接过期时间 长连接的空闲时间是2分钟private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//读写锁private final ReadWriteLock lock = new ReentrantReadWriteLock();// 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;// 代表的broker组的信息 BrokerData包含了一组Broker的信息private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了// 有可能会有很多复杂的业务场景 多个Clusterprivate final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//管理Broker的长连接心跳 是否还有心跳private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// Filter Server 是rocketMQ的一个高级功能,用来过滤消息//一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤//这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server//我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server// Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
这个RouteInfoManager会有一些比较核心的方法,
1.getAllClusterInfo 获取集群的信息
/*** 返回的是 broker的cluster信息* 里面包含的是HashMap<String //brokerName// BrokerData> brokerAddrTable* HashMap<String //clusterName// , Set<String //brokerName// >> clusterAddrTable* @return*/public ClusterInfo getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper;}
2.deleteTopic 删除对应的topic信息
/*** 删除某个topic 直接操作topicQueueTable的hashMap* @param topic*/public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}
3.getAllTopicList 查看所有的topic信息
/*** 查询所有的topic的列表信息* @return*/public TopicList getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList;}
4.registerBroker 注册broker方法
/*** broker的注册方法* @param clusterName broker的集群名称* @param brokerAddr broker的地址* @param brokerName broker所属组的名称* @param brokerId broker机器的id* @param haServerAddr broker的ha地址* @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据* @param filterServerList broker上部署的filterServer的列表* @param channel netty的网络长连接* @return broker注册的结果*/public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {
4.总结
老规矩,我们用一张图来进行总结一下这个文章的核心内容。