目录
MQTT是什么?
为什么用MQTT?
MQTT的5个概念
MQTT一般工作流程
paho mqtt 注意事项
发布消息和订阅消息 Java代码示例
MQTT是什么?
MQTT是一种轻量级的,基于发布-订阅模式的消息传输协议,广泛用于物联网领域
为什么用MQTT?
轻量级:开销低、报文小
可靠:MQTT支持QoS等级,保证消息传递的可靠性
双向通信:MQTT是基于发布-订阅模式的,MQTT客户端、MQTT broker 可以双向通信
对大规模物联网设备的支持
MQTT的5个概念
MQTT客户端:用了MQTT客户端库的设备或应用都是MQTT客户端
MQTTBroker:用来接收消息、传递消息,也包含连接、订阅等操作
发布-订阅模式:发布者和订阅者之间解耦,发布者和订阅者之间不需要建立直接连接
主题:MQTT使用主题来接收消息和传递消息
QoS:MQTT提供了3种服务质量,
QoS 0:最多交付一次,可能丢失数据
QoS 1:至少交付一次,保证收到消息,但消息可能重复
QoS 2:只交付一次,保证消息既不丢失也不重复
MQTT一般工作流程
客户端通过TCP/IP协议与Broker建立连接,客户端向指定主题发布消息,也可以订阅主题接收消息,Broker接收消息并传递消息
paho mqtt 注意事项
paho mqtt:初次连接失败,无法重连
paho mqtt:断开之后重连,通过设置 automaticReconnect 为 true 实现
paho mqtt: client id 保证唯一性(重复会踢掉前面连接)
paho mqtt: 断开连接后,自动重连成功,订阅者正常接收消息,需要设置 cleanSession 为 false,并且 qos 为 1(保证不丢数据, 但是可能接收到重复数据,需要在业务层保证幂等)
发布消息和订阅消息 Java代码示例
/*** 南昌接口mqtt发布消息* @author zhaohualei* @date 2019/6/10*/
public class MqttPublisher {private static final String HOST = Consts.NANCHANG_MQTT_HOST;private static final String USER_NAME = Consts.NANCHANG_MQTT_USER_NAME;private static final String PASSWORD = Consts.NANCHANG_MQTT_PASSWORD;private static MqttClient mqttClient;static {init();}private static void init() {connect();}private static void connect() {try {mqttClient = new MqttClient(HOST, MqttClient.generateClientId(), new MemoryPersistence());mqttClient.connect(getConnectionOptions());mqttClient.setCallback(new PublisherCallback());if (mqttClient.isConnected()) {LogUtils.PUSH.info("mqtt client first connect to proxy server success!");} else {LogUtils.PUSH.info("mqtt client first connect to proxy server failed!");}} catch (MqttException e) {LogUtils.ERROR.error("Nanchang mqtt client first connect to proxy server failed due to exception:", e);}}private static MqttConnectOptions getConnectionOptions() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(USER_NAME);mqttConnectOptions.setPassword(PASSWORD.toCharArray());mqttConnectOptions.setCleanSession(true);mqttConnectOptions.setMaxInflight(1000);mqttConnectOptions.setAutomaticReconnect(true);return mqttConnectOptions;}public static void publish(String data, String topic, String bikeNo) {if (mqttClient.isConnected()) {MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(1);mqttMessage.setRetained(false);mqttMessage.setPayload(data.getBytes());MqttTopic topicObject = mqttClient.getTopic(topic);try {topicObject.publish(mqttMessage);StatisticLogUtil.incr(StatisticAreaNameAndKeyEnum.NANCHANG_BIKE_ORDER.getTotalCountKey());StatisticLogUtil.addBikeNumberToRedisSet(StatisticAreaNameAndKeyEnum.NANCHANG_BIKE_ORDER.getSuccessCountKey(), bikeNo);} catch (Exception e) {LogUtils.ERROR.error("Nanchang mqtt client publish message to topic failed due to exception:", e);}}}static class PublisherCallback implements MqttCallbackExtended {@Overridepublic void connectionLost(Throwable cause) {LogUtils.ERROR.error("Nanchang mqtt client connectionLost, cause message:{}", cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {LogUtils.PUSH.info("Nanchang mqtt client messageArrived,topic:{}, message:{}", topic, new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {LogUtils.PUSH.info("Nanchang mqtt client deliveryComplete, isComplete:{}, getException:{}", token.isComplete(), token.getException());if (token.isComplete()) {if (token.getException() == null) {StatisticLogUtil.incr(StatisticAreaNameAndKeyEnum.NANCHANG_BIKE_ORDER.getSuccessCountKey());} else {StatisticLogUtil.incr(StatisticAreaNameAndKeyEnum.NANCHANG_BIKE_ORDER.getFailureCountKey());}}}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {LogUtils.PUSH.info("Nanchang mqtt client connectComplete, reconnect:{}, serverURI:{}", reconnect, serverURI);}}}
/*** subscribe message.* Created by zhaohualei on 2019/01/02.*/
@Component
public class MqttSubscriber {private static final String HOST = Consts.NANCHANG_MQTT_HOST;private static final String USER_NAME = Consts.NANCHANG_MQTT_USER_NAME;private static final String PASSWORD = Consts.NANCHANG_MQTT_PASSWORD;private static final String CLIENT_ID = "client_id";private static final String TOPIC_AREA_NOTICE = "monitor/area/notice";private static final String TOPIC_STATION_NOTICE = "monitor/station/notice";private static final String TOPIC_STAGE_BIKE_INFO = "monitor/stagebike/bikeinfo";private static final String TOPIC_AREA_INFO = "monitor/area/information";private static final String TOPIC_STATION_INFO = "monitor/station/information";@Autowiredprivate NanchangSupervisionService nanchangSupervisionService;public void subscribe() {try {MqttClient mqttClient = new MqttClient(HOST, CLIENT_ID, new MemoryPersistence());MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(USER_NAME);mqttConnectOptions.setPassword(PASSWORD.toCharArray());mqttConnectOptions.setCleanSession(false);mqttConnectOptions.setAutomaticReconnect(true);mqttClient.connect(mqttConnectOptions);if (mqttClient.isConnected()) {LogUtils.PUSH.info("mqtt subscribe client first connect to proxy server success!");} else {LogUtils.PUSH.info("mqtt subscribe client first connect to proxy server failed!");}mqttClient.subscribe(new String[]{TOPIC_AREA_NOTICE, TOPIC_STATION_NOTICE, TOPIC_STAGE_BIKE_INFO, TOPIC_AREA_INFO, TOPIC_STATION_INFO}, new int[]{1, 1, 1, 1, 1});mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectionLost(Throwable cause) {LogUtils.ERROR.error("Nanchang mqtt subscribe client connectionLost, cause message:{}", cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payLoad = new String(message.getPayload());LogUtils.PUSH.info("Nanchang mqtt subscribe client messageArrived,topic:{}, message:{}", topic, payLoad);if (StringUtils.isNotBlank(payLoad)) {if (TOPIC_AREA_INFO.equals(topic)) {nanchangSupervisionService.updateAreaInfo(payLoad);} else if (TOPIC_STATION_INFO.equals(topic)) {nanchangSupervisionService.updateStationInfo(payLoad);} else if (TOPIC_AREA_NOTICE.equals(topic)) {} else if (TOPIC_STATION_NOTICE.equals(topic)) {} else if (TOPIC_STAGE_BIKE_INFO.equals(topic)) {}}}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {LogUtils.PUSH.info("Nanchang mqtt subscribe client deliveryComplete, isComplete:{}, getException:{}", token.isComplete(), token.getException());}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {LogUtils.PUSH.info("Nanchang mqtt subscribe client connectComplete, reconnect:{}, serverURI:{}", reconnect, serverURI);}});} catch (MqttException e) {LogUtils.ERROR.error("Nanchang mqtt subscribe client first connect to proxy server failed due to exception:", e);}}}