欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 手游 > springboot 引入mqtt

springboot 引入mqtt

2024/10/24 10:27:39 来源:https://blog.csdn.net/u014007760/article/details/140047278  浏览:    关键词:springboot 引入mqtt

依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

配置文件

mqtt:server:username: xxxpassword: xxxserverURI: tcp://127.0.0.1:1883clientId: xxx_${random.int[1000,9999]}keepAliveInterval: 120connectionTimeout: 30topic: topic1,topic2

配置类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.List;@Slf4j
@Configuration
public class MqttConfig {@Value("${mqtt.server.username}")private String username;@Value("${mqtt.server.password}")private String password;@Value("${mqtt.server.serverURI}")private String serverURI;@Value("${mqtt.server.clientId}")private String clientId;@Value("${mqtt.server.keepAliveInterval}")private int keepAliveInterval;@Value("${mqtt.server.connectionTimeout}")private int connectionTimeout;@Value("${mqtt.topic}")private List<String> topics;private MqttClient mqttClient;@Beanpublic MqttClient mqttClient(MqttConsumeService mqttConsumeService) {MqttClientPersistence persistence = new MemoryPersistence();MqttClient client = null;try {client = new MqttClient(serverURI, clientId, persistence);client.setTimeToWait(5000);mqttConsumeService.setMqttClient(client);client.setCallback(mqttConsumeService);this.mqttClient = client;} catch (MqttException e) {log.error("mqtt init error {}", e);}return client;}public void connectAndSubscribe(){try {mqttClient.connect(this.mqttConnectOptions());} catch (MqttException e) {log.error("mqtt connection error", e);}this.subscribe(mqttClient);}private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);options.setConnectionTimeout(connectionTimeout);options.setKeepAliveInterval(keepAliveInterval);return options;}public void subscribe(MqttClient mqttClient) {topics.forEach(topic -> {try {mqttClient.subscribe(topic, 0);} catch (MqttException e) {log.error("mqtt subscribe error {}", e);}});}}

连接类 在spring 启动完成后再连接mqtt 防止过早触发messageArrived方法(messageArrived方法里面可能存放业务处理逻辑 过早触发会导致某些业务service还没注入完成就被调用)

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class MqttConnector implements ApplicationRunner {@Resourceprivate MqttConfig mqttConfig;@Overridepublic void run(ApplicationArguments args){mqttConfig.connectAndSubscribe();}
}

publish service

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Slf4j
@Service
public class MqttPublishService {@Resourceprivate MqttClient mqttClient;public void send(String topic, String message, Integer qos) {try {mqttClient.publish(topic, message.getBytes(), qos, false);} catch (MqttException e) {throw new RuntimeException("mqtt发布消息异常",e);}}public void send(String topic, String message) {try {mqttClient.publish(topic, message.getBytes(), 0, false);} catch (MqttException e) {throw new RuntimeException("mqtt发布消息异常",e);}}
}

consume service

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class MqttConsumeService implements MqttCallbackExtended {private MqttClient mqttClient;public void setMqttClient(MqttClient mqttClient) {this.mqttClient = mqttClient;}@Overridepublic void connectComplete(boolean b, String s) {}@Overridepublic void connectionLost(Throwable throwable) {try {mqttClient.reconnect();} catch (MqttException e) {log.error("mqtt重新连接失败",e);}}/*** topic消费方法*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {log.info(topic);log.info(mqttMessage.toString());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com