欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端连接Mqtt Broker

2025/1/21 11:01:37 来源:https://blog.csdn.net/m0_37978198/article/details/144630711  浏览:    关键词:Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version>
</dependency>

设置mqtt配置数据

在application.yml中添加如下配置

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0

MqttClient配置

将MqttClient加入到IoC容器,并连接客户端

package com.angel.ocean.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);return client;}
}

MqttService

mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接

package com.angel.ocean.mqtt;import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Service
public class MqttService {@Resourceprivate MqttClient client;@Resourceprivate KafkaService kafkaService;@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);}/*** 连接*/public void connect(String username, String password) throws MqttException {if(!client.isConnected()) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);}}/*** 发送消息*/public void publish(String topic, String data) {if(client.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅*/public void subscribe(String topic) {if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接*/public void disconnect() {try {client.disconnect();client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}

自定义MqttCallback

对客户端连接丢失,收到消息做一些模拟处理

package com.angel.ocean.mqtt;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
public class MqttCallbackHandler implements MqttCallback {private KafkaService kafkaService;public MqttCallbackHandler(KafkaService kafkaService) {this.kafkaService = kafkaService;}@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开...", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}

MqttController

用于模拟客户端行为

package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/***  前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/subscribe")public ApiResult<?> subscribe(String topic) {mqttService.subscribe(topic);return ApiResult.success();}@GetMapping("/publish")public ApiResult<?> publish(String topic, String message) {mqttService.publish(topic, message);return ApiResult.success();}@GetMapping("/disconnect")public ApiResult<?> disconnect() {mqttService.disconnect();return ApiResult.success();}
}

代码验证

启动mqtt客户端

如下图客户端已上线:
在这里插入图片描述

发送消息

在这里插入图片描述如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
在这里插入图片描述

接收数据

首先我们先订阅个主题:mqtt/0/0

在这里插入图片描述

使用MQTTX客户端向该主题发消息

在这里插入图片描述

Java mqtt客户端接收数据

查询本地Java mqtt客户收到的消息,如下图收到该消息
在这里插入图片描述mqtt broker 也可以看到该日志:
在这里插入图片描述

断开连接

在这里插入图片描述如下图本地客户端862024121819020已断开连接:
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com