欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > SpringBoot整合MQTT最详细版(亲测有效)

SpringBoot整合MQTT最详细版(亲测有效)

2025/3/22 7:09:54 来源:https://blog.csdn.net/m0_74825488/article/details/146431606  浏览:    关键词:SpringBoot整合MQTT最详细版(亲测有效)
一、导入pom.xml依赖
 <!--mqtt依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
二、配置MQTT相关信息到application.yml
spring:mqtt:username: 你的账号                           # 账号password: 你的密码                        # 密码hostUrl: tcp://127.0.0.1:1883           # mqtt连接tcp地址clientid: ${random.value}                # 客户端Id,不能相同,采用随机数 ${random.value}default-topic: /testtopic/#                      # 默认主题timeout: 3000                           # 超时时间keepalive: 600                            # 保持连接subscribeFlag: true                     #是否进行订阅true或者falseenabled: true                 # 是否使用mqtt功能

在这里插入图片描述

三、在项目中创建mqtt文件夹后添加AjaxResult.java、MqttConfig.java、MqttInit.java、MqttPushClient.java、PushCallback.java,如图:在这里插入图片描述

AjaxResult.java代码

public class AjaxResult extends HashMap<String, Object>
{private static final long serialVersionUID = 1L;/** 状态码 */public static final String CODE_TAG = "code";/** 返回内容 */public static final String MSG_TAG = "msg";/** 数据对象 */public static final String DATA_TAG = "data";/*** 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。*/public AjaxResult(){}/*** 初始化一个新创建的 AjaxResult 对象* * @param code 状态码* @param msg 返回内容*/public AjaxResult(int code, String msg){super.put(CODE_TAG, code);super.put(MSG_TAG, msg);}/*** 初始化一个新创建的 AjaxResult 对象* * @param code 状态码* @param msg 返回内容* @param data 数据对象*/public AjaxResult(int code, String msg, Object data){super.put(CODE_TAG, code);super.put(MSG_TAG, msg);if (StringUtils.isNotNull(data)){super.put(DATA_TAG, data);}}/*** 返回成功消息* * @return 成功消息*/public static AjaxResult success(){return AjaxResult.success("操作成功");}/*** 返回成功数据* * @return 成功消息*/public static AjaxResult success(Object data){return AjaxResult.success("操作成功", data);}/*** 返回成功消息* * @param msg 返回内容* @return 成功消息*/public static AjaxResult success(String msg){return AjaxResult.success(msg, null);}/*** 返回成功消息* * @param msg 返回内容* @param data 数据对象* @return 成功消息*/public static AjaxResult success(String msg, Object data){return new AjaxResult(HttpStatus.SUCCESS, msg, data);}/*** 返回警告消息** @param msg 返回内容* @return 警告消息*/public static AjaxResult warn(String msg){return AjaxResult.warn(msg, null);}/*** 返回警告消息** @param msg 返回内容* @param data 数据对象* @return 警告消息*/public static AjaxResult warn(String msg, Object data){return new AjaxResult(HttpStatus.WARN, msg, data);}/*** 返回错误消息* * @return 错误消息*/public static AjaxResult error(){return AjaxResult.error("操作失败");}/*** 返回错误消息* * @param msg 返回内容* @return 错误消息*/public static AjaxResult error(String msg){return AjaxResult.error(msg, null);}/*** 返回错误消息* * @param msg 返回内容* @param data 数据对象* @return 错误消息*/public static AjaxResult error(String msg, Object data){return new AjaxResult(HttpStatus.ERROR, msg, data);}/*** 返回错误消息* * @param code 状态码* @param msg 返回内容* @return 错误消息*/public static AjaxResult error(int code, String msg){return new AjaxResult(code, msg, null);}/*** 是否为成功消息** @return 结果*/public boolean isSuccess(){return Objects.equals(HttpStatus.SUCCESS, this.get(CODE_TAG));}/*** 是否为警告消息** @return 结果*/public boolean isWarn(){return Objects.equals(HttpStatus.WARN, this.get(CODE_TAG));}/*** 是否为错误消息** @return 结果*/public boolean isError(){return Objects.equals(HttpStatus.ERROR, this.get(CODE_TAG));}/*** 方便链式调用** @param key 键* @param value 值* @return 数据对象*/@Overridepublic AjaxResult put(String key, Object value){super.put(key, value);return this;}
}

MqttConfig.java代码:

@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}//创建MQTT客户端public MqttPushClient getMqttPushClient() {if(enabled == true){String mqtt_topic[] = StringUtils.split(defaultTopic, ",");System.out.println("开始连接======================="+clientId);//连接mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);System.out.println("开始订阅=======================");for(int i=0; i<mqtt_topic.length; i++){//订阅主题mqttPushClient.subscribe(mqtt_topic[i], 1);}}return mqttPushClient;}}

MqttInit.java代码

@Component
public class MqttInit implements ApplicationRunner {@Autowiredprivate MqttConfig mqttConfig;/*** 初始化客户端用于接收生产者发过来的消息,项目运行就会创建好* @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception{mqttConfig.getMqttPushClient();}
}

MqttPushClient.java代码

@Component
@Order(value = 2)
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数* @param callback  是否回调*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {System.out.println("进入连接----------------"+clientID);MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {//设置回调  接收消息时需要回调client.setCallback(pushCallback);client.connect(options);System.out.println("连接成功==================="+clientID);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* QoS 0(最多一次):消息发送后不进行确认,也不重试,是最低的服务质量等级。这种方式可能会导致消息丢失,但传输效率最高。* QoS 1(至少一次):确保消息至少被送达一次。如果发送方没有收到确认,它可能会重试发送消息,这可能导致消息重复。* QoS 2(恰好一次):保证消息准确无误地送达一次,不丢失也不重复,是最高的服务质量等级,但相应地增加了通信的开销。* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return AjaxResult.success();} catch (MqttPersistenceException e)  {e.printStackTrace();return AjaxResult.error();} catch (MqttException e) {e.printStackTrace();return AjaxResult.error();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}

PushCallback.java代码

@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {System.out.println("重连======================");// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");boolean flag=true;while(flag) {try {Thread.sleep(5000);if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}flag=false;} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));System.out.println("接收消息主题 : " + topic);System.out.println("接收消息Qos : " + mqttMessage.getQos());System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));_topic = topic;_qos = mqttMessage.getQos()+"";_msg = new String(mqttMessage.getPayload());}/*** 参数回调的方法* @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来  获取  接收到的硬件数据public String receive() {JSONObject jsonObject = new JSONObject();jsonObject.put("topic", _topic);jsonObject.put("qos", _qos);jsonObject.put("msg", _msg);return jsonObject.toString();}}

最后整合结束

四、测试

创建mttqController控制器发送消息,如图
在这里插入图片描述
mttqController代码

@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate MqttPushClient mqttPushClient;@GetMapping(value = "/a")public AjaxResult a(){JSONObject jsonObject = new JSONObject();jsonObject.put("hello", "你好");jsonObject.put("msg", "成功");AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());return success(null);}}

浏览器输入接口地址调试发送成功
在这里插入图片描述
使用MQTTX客户端工具查看消息成功被发送
在这里插入图片描述
springboot控制台(也就是PushCallback.java里的回调方法messageArrived)也成功打印出了客户端接收到的消息,如图

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词