欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > Kafka-02 @KafkaListener学习

Kafka-02 @KafkaListener学习

2024/11/30 6:43:41 来源:https://blog.csdn.net/qq_45149567/article/details/140319227  浏览:    关键词:Kafka-02 @KafkaListener学习

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ----------------------------
protected void finishRefresh() {clearResourceCaches();initLifecycleProcessor();// 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessorgetLifecycleProcessor().onRefresh();publishEvent(new ContextRefreshedEvent(this));if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}
}// ------------------------------ DefaultLifecycleProcessor ----------------------------
public void onRefresh() {startBeans(true);this.running = true;
}// ------------------------------ DefaultLifecycleProcessor ----------------------------
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {Lifecycle bean = lifecycleBeans.remove(beanName);if (bean != null && bean != this) {String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);for (String dependency : dependenciesForBean) {doStart(lifecycleBeans, dependency, autoStartupOnly);}if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {try {// 获取容器中的 LifeCycle bean 对象,调用它的 start()// SpringKafka 中对应的是 KafkaListenerEndpointRegistry// 我们重点看一下 KafkaListenerEndpointRegistry.start()bean.start();}catch (Throwable ex) {throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);}}}
}

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
public void start() {// 轮询所有的 ConcurrentMessageListenerContainer 对象// 执行 ConcurrentMessageListenerContainer.start()for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running = true;
}// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
private void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {// 执行 ConcurrentMessageListenerContainer.start()listenerContainer.start();}
}// ---------------------------- AbstractMessageListenerContainer ---------------------------
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {// 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()doStart();}}
}

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer ---------------------------
protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.concurrency = topicPartitions.length;}setRunning(true);// 1. 根据 @KafkaListener 中配置的 concurrency 轮询for (int i = 0; i < this.concurrency; i++) {// 2. 创建 KafkaMessageListenerContainerKafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);// 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置configureChildContainer(i, container);if (isPaused()) {container.pause();}// 4. 启动 KafkaMessageListenerContainercontainer.start();// 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中this.containers.add(container);}}
}

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {String beanName = getBeanName();beanName = (beanName == null ? "consumer" : beanName) + "-" + index;container.setBeanName(beanName);ApplicationContext applicationContext = getApplicationContext();if (applicationContext != null) {container.setApplicationContext(applicationContext);}ApplicationEventPublisher publisher = getApplicationEventPublisher();if (publisher != null) {container.setApplicationEventPublisher(publisher);}// 设置 clinetIdSuffix,clientId 前缀container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");container.setGenericErrorHandler(getGenericErrorHandler());container.setCommonErrorHandler(getCommonErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setBatchInterceptor(getBatchInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setListenerInfo(getListenerInfo());AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();if (exec == null) {// 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executorsexec = new SimpleAsyncTaskExecutor(beanName + "-C-");this.executors.add(exec);// 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainercontainer.getContainerProperties().setConsumerTaskExecutor(exec);}
}

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();if (consumerExecutor == null) {consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);// 1. 创建 ListenerConsumer// ListenerConsumer 是一个 Runnable 对象// new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员// 它的 run() 比较重要this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);// 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;// 开启自旋while (isRunning()) {// 通过 KafkaConsumer 向 kafka-server 发起 poll 请求pollAndInvoke();}wrapUp(exitThrowable);
}

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {if (this.invokerHandlerMethod != null) {// 最终的执行入口// 最后会通过反射调用我们的 @KafkaListener 声明的方法return this.invokerHandlerMethod.invoke(message, providedArgs);} else if (this.delegatingHandler.hasDefaultHandler()) {Object[] args = new Object[providedArgs.length + 1];args[0] = message.getPayload();System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);return this.delegatingHandler.invoke(message, args);} else {return this.delegatingHandler.invoke(message, providedArgs);}
}

至此,SpringKafka 分析完毕;

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com