简单版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
手动ack版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
批量处理版本
@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
可关闭版本
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}
调用添加监听
/*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}