一、MQTT的概念
MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ
1.MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。
2.订阅与发布规则
二、MQTT 代理服务器
三、Mosquitto 代理服务器
Eclipse Mosquitto 也是一款开源的 MQTT Broker,兼容 MQTT 协议的 5.0、3.1.1 和 3.1 版本。Mosquitto 体积小巧,既可以运行在低功耗的单板计算机上,也可以部署在企业级服务器上。它采用 C 语言编写,可以用 C 库实现 MQTT 客户端。它支持 Windows、Mac、Linux 和 Raspberry Pi 等多种平台,为每个平台提供了方便安装的二进制文件。最新版本还增加了一个认证和授权插件 “mosquitto-go-auth”,以及一个用于管理 Mosquitto 实例的 Web 用户界面。此外,它还提供了一个 PHP 包装器 “Mosquitto-PHP”,可以方便地在 PHP 中开发 MQTT 客户端。
Eclipse Mosquitto 官方网站
GitHub - eclipse-mosquitto/mosquitto: Eclipse Mosquitto - An open source MQTT broker github开源网站
Mosquitto 服务器安装
1.下载 openssl 加密库源码
Old 1.1.1 Releases | OpenSSL Library
2.下载cjson 源码
GitHub - DaveGamble/cJSON: Ultralightweight JSON parser in ANSI C
3.下载mosquitto 服务器
Index of /files/source/
4.配置&安装⭐⭐
将下载好文件放到一个自己知道的路径,然后用wsl打开解压到家目录。不要解压到共享文件里。
------- openssl加密库安装---------
tar -xvf openssl-1.1.1q.tar.gz -C ~/ #1.解压源码
cd ~/openssl-1.1.1q/ #2.进入源码目录
./config #3.默认配置 make test -j12 #4.编译测试代码与库文件sudo make install #5.安装 ------cjson库安装-------cp cJSON-master.zip ~/ #1.拷贝到家目录cd ~/ #2.进入家目录sudo apt install unzip #3.安装解压工具 unzip cJSON-master.zip #4.解压json源码 cd cJSON-master/ #5.进入json源码make #6.编译源码sudo make install #7.安装 -----安装mosquitto代理服务器-----
sudo apt-get install g++ #1.安装g++编译器
tar -xvf mosquitto-2.0.9.tar.gz -C ~/ #2.解压到家目录
cd ~/mosquitto-2.0.9/ #3.进入源码目录
make #4.编译
sudo make install #5.安装 tip💡如果cjson库或ssl 库安装失败
cJSON - for client JSON output support. Disable with make WITH_CJSON=no Auto detected with CMake.make WITH_CJSON=no #去掉cjson openssl (libssl-dev on Debian based systems) - disable with make WITH_TLS=nomake WITH_TLS=no
测试成功如下
四、mosquitto 代理服务器使用
1、订阅与发布命令
#拷贝库文件到系统库中 sudo cp /usr/local/lib/lib* /lib#拷贝系统的配置文件 cp /etc/mosquitto/mosquitto.conf.example ./ //当前文件为自己写代码的文件里
2、修改端口
3、关闭防火墙
#开启代理服务器 尝试能不能行mosquitto -c ./mosquitto.conf.example#订阅主题 'test/topic' mosquitto_sub -t 'test/topic' -v#发布消息 mosquitto_pub -t 'test/topic' -m 'hello world'
4、发布端函数接口
将官方大的代码拷贝到自己写代码的文件里边看官方代码边写自己的代码。
//1初始化MQTT库,在调用任何函数之前必须要调用该函数
int mosquitto_lib_init(void);
返回值:MOSQ_ERR_SUCCESS - on success.//2创建一个新的客户端
struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
id:客户端ID,如何为NULL ,系统自动分配一个ID,且clean_session参数必须为 true
clean_session: true 代理服务器在客户端断开后会清除数据 false 代理服务器在客户端断开后会保留数据
obj:传递给回调函数的参数
返回值:成功 客户端对象地址 失败 NULL //3连接MQTT服务器
int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive);
mosq:客户端对象
host:服务器IP地址
port:服务器端口号 👉1883
keepalive:保持连续ping 包,设置一段时间后发送一个ping给服务器
返回值: MOSQ_ERR_SUCCESS - on success.//4启动网络线程,不断处理网络数据
int mosquitto_loop_start(struct mosquitto *mosq);
mosq:客户端对象
返回值: MOSQ_ERR_SUCCESS - on success.//5⭐重点,难点 : 发布消息
int mosquitto_publish(struct mosquitto *mosq, //客户端对象 int *mid, //消息ID ,设置为 NULL 即可const char *topic, //👍发布的消息主题int payloadlen, //发布消息长度 between 0 and 268,435,455.const void *payload, //发布的消息int qos, //👍消息质量 0 ,1 , 2bool retain); //消息保留标记为 true 保留消息
qos 消息质量
MQTT(Message Queuing Telemetry Transport)协议定义了三种消息质量等级(QoS),以确保在不同场景下消息的可靠传输。
以下是三种QoS级别的详细说明:
1. QoS 0(At Most Once,最多一次)
特点:消息最多被传递一次,没有确认机制,消息可能会丢失或重复。
适用场景:适用于对消息完整性要求不高的场景,如天气更新、实时数据流、传感器数据等。在这些场景中,即使消息丢失或重复,也不会造成重大影响。
传输过程:发送方发送消息后,不等待接收方的确认,也不存储消息以进行重传。
2. QoS 1(At Least Once,至少一次)
特点:消息至少被传递一次,但可能会出现重复的消息。
适用场景:适用于对消息丢失敏感,但对重复不敏感的场景,如传感器数据、开关状态同步等。在这些场景中,确保消息到达比避免重复更重要。
传输过程:发送方发送消息后,会等待接收方的确认报文(PUBACK)。如果未收到确认,发送方会重传消息。接收方收到消息后,会发送PUBACK确认报文。
3. QoS 2(Exactly Once,仅一次)
特点:消息确保只传递一次,没有重复。这是最高级别的QoS,适用于对消息的完整性和顺序性要求非常高的场景。
适用场景:适用于金融交易、关键命令等场景,在这些场景中,消息的丢失或重复都是不可接受的。
传输过程:发送方和接收方通过四次握手过程(PUBREC、PUBREL、PUBCOMP)来确保消息只被接收一次。发送方发送消息后,等待接收方的PUBREC确认,然后发送PUBREL,最后等待接收方的PUBCOMP确认。
QoS级别的选择
QoS 0:适用于实时性要求高,但对数据丢失和重复容忍度较高的场景。
QoS 1:适用于需要确保消息至少被接收一次,但允许重复的场景。
QoS 2:适用于需要确保消息仅被接收一次,且对实时性要求不高的场景。
注意事项
网络条件:在网络条件较差的情况下,建议选择较低的QoS级别,以减少消息丢失的风险。
系统资源:QoS级别越高,传输过程的复杂程度和系统资源消耗也越大。
综上所述,根据具体的应用场景和需求选择合适的QoS级别,可以在确保消息可靠传输的同时,优化系统性能和资源利用率。
⭐⭐发布端代码例子:
#include <mosquitto.h> //⚠️声明MQTT库的接口
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>int main(int argc, char *argv[])
{// 1.初始化MQTT库mosquitto_lib_init();// 2.创建一个客户端struct mosquitto *mosq = mosquitto_new(NULL, true, NULL);if (mosq == NULL){perror("创建客户端失败\n");return 1;}else{printf("创建客户成功\n");}// 3.连接服务器,超时设置为60秒int ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 60);if (ret != MOSQ_ERR_SUCCESS){perror("连接服务器失败\n");return 1;}else{printf("连接服务器成功\n");}// 4.启动网络线程mosquitto_loop_start(mosq);while (1){printf("请输入发布的主题和消息\n");char topic[50] = {0};char payload[268] = {0};scanf("%s %s", topic, payload);int ret = mosquitto_publish(mosq, NULL, topic, strlen(payload), payload, 0, false);if (ret != MOSQ_ERR_SUCCESS){perror("发布消息失败\n");return 1;}else{printf("发布消息成功\n");}}mosquitto_destroy(mosq); //销毁对象mosquitto_lib_cleanup(); //清空函数库
}
5、订阅端函数接口
//1.初始化MQTT库,在调用任何函数之前必须要调用该函数
int mosquitto_lib_init(void);
返回值:MOSQ_ERR_SUCCESS - on success.//2。创建一个新的客户端
struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
id:客户端ID,如何为NULL ,系统自动分配一个ID,且clean_session参数必须为 true
clean_session: true 代理服务器在客户端断开后会清除数据 false 代理服务器在客户端断开后会保留数据
obj:传递给回调函数的参数
返回值:成功 客户端对象地址 失败 NULL //3.连接MQTT服务器
int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive);
mosq:客户端对象
host:服务器IP地址
port:服务器端口号 👉1883
keepalive:保持连续ping 包,设置一段时间后发送一个ping给服务器 (超时时间)
返回值: MOSQ_ERR_SUCCESS - on success.//4.订阅主题
int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);
mosq:客户端对象
mid:消息ID,设置为NULL不关心
sub:👍需要订阅的主题
qos:消息质量 0,1,2
返回值: MOSQ_ERR_SUCCESS - on success.//5.⭐重点,难点:设置消息回调函数,当订阅的主题有消息时,会调用该函数
void mosquitto_message_callback_set(struct mosquitto *mosq,
void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *));
mosq:客户端对象
on_message:函数指针,指向一个回调函数如下👇
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{/* This blindly prints the payload, but the payload can be anything so take care. */printf("%s %d %s\n", msg->topic, msg->qos, (char *)msg->payload);
}
mosq:客户端对象
obj:创建客户端时传递的参数
msg:👍消息💡消息结构体
struct mosquitto_message{int mid; //发布消息id char *topic; //主题void *payload; //消息 int payloadlen; //消息长度 int qos; //通信质量 bool retain; //保留消息标记
};6.循环接收消息 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets);mosq:客户端对象 timeout:超时检测 ,设置 -1 永久等待
订阅端代码例子:
#include <mosquitto.h> //⚠️声明MQTT库的接口
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{/* This blindly prints the payload, but the payload can be anything so take care. */printf("%s %d %s\n", msg->topic, msg->qos, (char *)msg->payload);
}int main(int argc, char *argv[])
{// 1.初始化MQTT库mosquitto_lib_init();// 2.创建一个客户端struct mosquitto *mosq = mosquitto_new(NULL, true, NULL);if (mosq == NULL){perror("创建客户端失败\n");return 1;}else{printf("创建客户成功\n");}// 3.连接服务器int ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 60);if (ret != MOSQ_ERR_SUCCESS){perror("连接服务器失败\n");return 1;}else{printf("连接服务器成功\n");}// 4.订阅一个主题ret = mosquitto_subscribe(mosq, NULL, "test", 0);if (ret != MOSQ_ERR_SUCCESS){perror("订阅失败\n");return 1;}else{printf("订阅成功\n");}// 5.设置消息回调函数mosquitto_message_callback_set(mosq, on_message); // 设置接收消息回调函数// 6.循环接收消息mosquitto_loop_forever(mosq, -1, 1); // 一直循环接收数据mosquitto_destroy(mosq);mosquitto_lib_cleanup();return 0;
}
五、MQTTX 调试助手
公共测试服务器:免费的公共 MQTT 服务器 | EMQ
MQTTX 下载
安装
设置为简体中文
主题的订阅与发布调试
1.新建链接:服务器为你写代码的地址。
2.主题订阅:主题名字随便
3.主题发布,要跟代码的订阅一样。
4.实现MQTT远程通信
即在调试助手中(发送端)发送消息到阿里云(部署MQTT代理服务器)中,然后MQTT代理服务器转换到Ubuntu的终端中。