欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 高考 > Rocketmq源码分析(1)

Rocketmq源码分析(1)

2024/10/24 18:21:20 来源:https://blog.csdn.net/lixiaoyi01/article/details/141970070  浏览:    关键词:Rocketmq源码分析(1)
此次源码分析-rocketmq-spring-boot-starter,starter众所周知入口点就是AutoConfiguration.RocketMQAutoConfiguration.class
// 标识为配置类
@Configuration
//将RocketMQProperties识别为配置属性类,创建对象并注入到spring容器中
@EnableConfigurationProperties(RocketMQProperties.class)
// 当类路径中有MQAdmin.class 才启用本配置
@ConditionalOnClass({MQAdmin.class})
//条件启用配置,如果配置rocketmq.name-server不存在,默认会加载此配置,如果没有配置havingValue就意味着rocketmq.name-server 不是 false 就会加载此配置
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
// 把MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、RocketMQTransactionConfiguration 当做配置类加载进来
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
// 指定该配置类会在MessageConverterConfiguration配置类的后面加载
@AutoConfigureAfter({MessageConverterConfiguration.class})
// 指定该配置类会在RocketMQTransactionConfiguration配置类的前面加载
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})public class RocketMQAutoConfiguration {private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME ="rocketMQTemplate";@Autowiredprivate Environment environment;@PostConstructpublic void checkProperties() {String nameServer = environment.getProperty("rocketmq.name-server", String.class);log.debug("rocketmq.nameServer = {}", nameServer);if (nameServer == null) {log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");}}@Bean@ConditionalOnMissingBean(DefaultMQProducer.class)@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();String nameServer = rocketMQProperties.getNameServer();String groupName = producerConfig.getGroup();Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");String accessChannel = rocketMQProperties.getAccessChannel();String ak = rocketMQProperties.getProducer().getAccessKey();String sk = rocketMQProperties.getProducer().getSecretKey();boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);producer.setNamesrvAddr(nameServer);if (!StringUtils.isEmpty(accessChannel)) {producer.setAccessChannel(AccessChannel.valueOf(accessChannel));}producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());producer.setMaxMessageSize(producerConfig.getMaxMessageSize());producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());return producer;}@Bean(destroyMethod = "destroy")@ConditionalOnBean(DefaultMQProducer.class)@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,RocketMQMessageConverter rocketMQMessageConverter) {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducer(mqProducer);rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}
}

重点 是import注解 引入的这几个配置类。 首先第一个RocketMQTransactionConfiguration配置类,它是来解析RocketMQMessageListener 注解修饰的业务逻辑类的。想要知道RocketMQMessageListener 具体实现方式,就要阅读此类的代码。

@Configuration
// 此处实现ApplicationContextAware是为了获取容器上下文, 实现SmartInitializingSingleton是为了在单例bean全部初始化后的回调中做一些处理
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {...public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,...}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = (ConfigurableApplicationContext) applicationContext;}/**** 此处是重点*/@Overridepublic void afterSingletonsInstantiated() {// 获取容器中所有RocketMQMessageListener注解修饰的bean,并且不是代理对象。Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));// 循环处理 beans.forEach(this::registerContainer);}private void registerContainer(String beanName, Object bean) {Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);...// 获取注解RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);// 解析占位符 获取最终的消费者组的名称String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());String topic = this.environment.resolvePlaceholders(annotation.topic());// 获取消费者监听器的状态 默认为trueboolean listenerEnabled =(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP).getOrDefault(topic, true);...// bean的名字  加后缀区分String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;// 注册监听器genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {try {// 启动监听器container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}...}

最后进入的start

public synchronized void start() throws MQClientException {switch (this.serviceState) {//状态的处理}// 当订阅改变的时候 需要更新topic订阅信息this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 检查客户端是否已经在 Broker 中注册this.mQClientFactory.checkClientInBroker();// 发送心跳包给所有的brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 触发负载均衡消费者组this.mQClientFactory.rebalanceImmediately();}

总结上述源码,扫描所有mq的监听器注解,把注解修饰的类注册到容器中,并启动监听。再往深入的start()方法中看,就可以看到消费者、topic、tag相关信息的获取级处理过程,最后是启动。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com