欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统

Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统

2025/3/18 14:35:48 来源:https://blog.csdn.net/qu1210/article/details/146306165  浏览:    关键词:Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统

Java集成MQTT和Kafka实现高可用方案

1. 概述

在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。

MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和低带宽、高延迟网络。而Kafka是一个分布式流处理平台,提供高吞吐量、可扩展性和持久性。将两者结合,可以创建一个既能处理大量IoT设备连接,又能保证消息可靠传递和处理的系统。

2. 架构设计

我们的高可用架构设计如下:
在这里插入图片描述

主要组件:

  • MQTT集群:使用EMQ X等MQTT代理实现集群
  • Kafka集群:作为中央消息总线和持久化层
  • 桥接组件:将MQTT消息转发到Kafka
  • Java应用服务:处理和分析消息
  • 监控系统:确保整个系统的健康运行

3. Java集成MQTT实现

3.1 Maven依赖

<dependencies><!-- MQTT客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Spring Integration MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.15</version></dependency><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.7.8</version></dependency>
</dependencies>

3.2 MQTT配置类

@Configuration
public class MqttConfig {@Value("${mqtt.broker.urls}")private String[] brokerUrls;  // 多个MQTT代理地址,用于故障转移@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.topics}")private String[] topics;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();// 设置多个服务器地址,实现故障转移options.setServerURIs(brokerUrls);// 设置自动重连options.setAutomaticReconnect(true);options.setKeepAliveInterval(30);options.setConnectionTimeout(30);// 设置遗嘱消息,当客户端异常断开时发送options.setWill("clients/status", (clientId + ": disconnected").getBytes(), 1, true);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}// 设置清除会话,false表示客户端断开连接后,服务器保留其订阅信息options.setCleanSession(false);factory.setConnectionOptions(options);return factory;}// 出站通道(用于发送消息)@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}// 出站消息处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultQos(1);return messageHandler;}// 入站通道(用于接收消息)@Beanpublic MessageChannel mqttInboundChannel() {return new DirectChannel();}// 入站消息适配器@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-sub", mqttClientFactory(), topics);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInboundChannel());return adapter;}
}

3.3 MQTT服务类

@Service
@Slf4j
public class MqttService {private final MessageChannel mqttOutboundChannel;@Autowiredpublic MqttService(MessageChannel mqttOutboundChannel) {this.mqttOutboundChannel = mqttOutboundChannel;}// 发布消息到MQTT主题public void publish(String topic, String payload) {log.info("Publishing message to topic {}: {}", topic, payload);Message<String> message = MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, 1).setHeader(MqttHeaders.RETAINED, false).build();mqttOutboundChannel.send(message);}// 处理接收到的MQTT消息@ServiceActivator(inputChannel = "mqttInboundChannel")public void handleMessage(Message<?> message) {String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = message.getPayload().toString();log.info("Received message from topic {}: {}", topic, payload);// 这里可以添加消息处理逻辑,或者转发到Kafka}
}

4. Java集成Kafka实现

4.1 Maven依赖

<dependencies><!-- Kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.5</version></dependency>
</dependencies>

4.2 Kafka配置类

@Configuration
public class KafkaConfig {@Value("${kafka.bootstrap.servers}")private String bootstrapServers;@Value("${kafka.consumer.group.id}")private String consumerGroupId;// Kafka生产者配置@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();// 设置Kafka集群地址configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 高可用配置// acks=all表示所有副本都确认后才认为消息发送成功configProps.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数configProps.put(ProducerConfig.RETRIES_CONFIG, 10);// 启用幂等性,确保消息不会重复发送configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 批处理大小configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);// 批处理延迟configProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);// 缓冲区大小configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// Kafka消费者配置@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 高可用配置// 自动提交偏移量configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 从最早的消息开始消费configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 最大拉取记录数configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 心跳间隔configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);// 会话超时configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// 最大拉取间隔configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置并发消费者数量factory.setConcurrency(3);// 批量消费factory.setBatchListener(true);// 手动提交偏移量factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

4.3 Kafka服务类

@Service

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词