ESP32与MQTT通信
- 1 项目概览
- 2 硬件组成
- 3 MQTT协议解析
- MQTT协议简介
- MQTT核心概念
- 本项目中的MQTT应用
- 4 MQTT Broker选择
- EMQX Broker
- 其他常用MQTT Broker
- 5 代码解析
- 初始化与配置
- MQTT消息处理
- 发布传感器数据
- 6 MQTT话题TOPIC设计
- 7 EMQX的优势在IoT项目中的体现
- 8 MQTT通信流程
- 9 应用场景
- 10 代码原文
1 项目概览
本项目实现了以下功能:
- 通过ESP32读取光敏传感器的光照强度和倾斜传感器的状态
- 使用MQTT协议将传感器数据发布到服务器
- 接收服务器的控制命令,远程控制ESP32上的LED
2 硬件组成
- ESP32开发板:具备WiFi功能的微控制器
- 光敏传感器:连接到ESP32的模拟引脚(GPIO34)
- 倾斜传感器:连接到ESP32的数字引脚(GPIO5)
- LED指示灯:使用ESP32的内置LED(GPIO2)
3 MQTT协议解析
MQTT协议简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为资源受限设备和低带宽、高延迟网络设计。它具有以下特点:
- 轻量级:最小化网络带宽占用,适合IoT设备
- 发布/订阅模式:解耦消息发送者和接收者
- 可靠性:提供三种服务质量等级(QoS)
- 简单实现:易于集成到各种设备和平台
MQTT核心概念
- 发布者(Publisher):发送消息的客户端
- 订阅者(Subscriber):接收消息的客户端
- 代理(Broker):中心服务器,负责接收、过滤和分发消息
- 主题(Topic):消息的路由路径,采用层级结构
- 服务质量(QoS):消息传递的可靠性保证
本项目中的MQTT应用
在代码中,ESP32作为MQTT客户端既是发布者也是订阅者:
发布者角色:
- 将光敏传感器值发布到
aovalue
主题 - 将倾斜传感器状态发布到
dovalue
主题
订阅者角色:
- 订阅
switch
主题接收LED控制命令
4 MQTT Broker选择
EMQX Broker
在本项目中,我们使用了安装在本地服务器(192.168.3.32)上的EMQX Broker。
特点:
- 开源、高性能的分布式MQTT消息服务器
- 支持百万级并发连接
- 完全支持MQTT 5.0和3.1.1协议
- 提供丰富的插件系统和扩展能力
- 内置WebSocket支持,便于Web应用集成
- 提供详细的监控指标和可视化管理界面
安装方法:
# Docker安装(推荐)
docker pull emqx/emqx:latest
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:latest# Debian/Ubuntu
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx# CentOS/RHEL
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
sudo yum install emqx
基本配置:
通过访问http://localhost:18083(默认用户名和密码:admin/public)进行管理界面配置。
其他常用MQTT Broker
Mosquitto/HiveMQ/AWS IoT Cor
5 代码解析
初始化与配置
const char* mqtt_server = "192.168.*.32"; // MQTT服务器IP地址// 定义引脚
const int ledPin = 2; // LED引脚
const int lightSensorPin = 34; // 光敏传感器的模拟引脚
const int tiltSensorPin = 5; // 倾斜传感器的数字引脚WiFiClient espClient; // 创建WiFi客户端
PubSubClient client(espClient); // 创建MQTT客户端
MQTT消息处理
void callback(char* topic, byte* payload, unsigned int length) {// 将接收到的消息转换为字符串String message;for (int i = 0; i < length; i++) {message += (char)payload[i];}// 判断主题是否为"switch"if (String(topic) == "switch") {// 根据消息内容控制LEDif (message == "ON") {digitalWrite(ledPin, HIGH);} else if (message == "OFF") {digitalWrite(ledPin, LOW);}}
}
发布传感器数据
// 每隔指定时间发布一次数据
if (now - lastMsg > publishInterval) {lastMsg = now;// 读取光敏传感器模拟值int lightValue = analogRead(lightSensorPin);// 读取倾斜传感器数字状态int tiltState = digitalRead(tiltSensorPin);// 发布光敏数据char lightStr[10];sprintf(lightStr, "%d", lightValue);client.publish("aovalue", lightStr);// 发布倾斜状态char tiltStr[2];sprintf(tiltStr, "%d", tiltState);client.publish("dovalue", tiltStr);
}
6 MQTT话题TOPIC设计
本项目使用了三个TOPIC:
-
aovalue:模拟输出值(Analog Output Value)
- 用于发布光敏传感器的模拟值
- 数值范围:0-4095
-
dovalue:数字输出值(Digital Output Value)
- 用于发布倾斜传感器的状态
- 数值:0或1
-
switch:控制指令
- 用于接收LED控制命令
- 有效值:“ON"或"OFF”
这种主题设计简单明确,适合小型项目。在大型项目中,EMQX支持更复杂的主题结构和通配符订阅,如:
device/{deviceID}/{sensorType}/value
device/{deviceID}/control/{action}
7 EMQX的优势在IoT项目中的体现
- 高性能:EMQX可处理大量并发连接,适合规模化IoT部署
- 规则引擎:内置数据处理、转发和持久化能力
- 认证与授权:提供多种认证方式,增强安全性
- 丰富的监控:详细的实时监控指标,便于系统管理
- 集群能力:支持水平扩展,满足业务增长需求
- 多协议支持:除MQTT外,还支持CoAP、LwM2M等IoT协议
- 数据集成:可轻松集成Kafka、数据库等后端系统
8 MQTT通信流程
-
连接阶段:
- ESP32连接到WiFi网络
- 连接到EMQX Broker
- 订阅"switch"主题
-
发布阶段:
- 定期读取传感器数据
- 将数据发布到相应主题
-
订阅处理:
- 接收"switch"主题的消息
- 根据消息内容控制LED状态
-
断线重连:
- 监测MQTT连接状态
- 断线时自动重连并恢复订阅
9 应用场景
这个简单项目的基础技术可扩展到多种应用场景:
- 环境监测:监测光照变化,可用于智能农业
- 安全监控:检测设备是否被移动或倾斜
- 家居自动化:基于环境变化自动控制设备
- 数据收集:将传感器数据集中存储分析
10 代码原文
#include <WiFi.h> // ESP32的WiFi库
#include <PubSubClient.h> // MQTT客户端库const char* ssid = "601B"; // WIFI名称
const char* password = "12345678"; // WIFI密码
const char* mqtt_server = "192.168.3.32"; // MQTT服务器IP地址// 定义引脚
const int ledPin = 2; // LED引脚,ESP32内置LED通常在引脚2
const int lightSensorPin = 34; // 光敏传感器的模拟输入引脚,根据实际连接调整
const int tiltSensorPin = 5; // 倾斜传感器的数字输入引脚,根据实际连接调整// 设置上次发布的时间
unsigned long lastMsg = 0;
const int publishInterval = 1000; // 发布频率(毫秒)WiFiClient espClient; // 创建WiFi客户端对象
PubSubClient client(espClient); // 创建MQTT客户端对象// 函数用于连接WiFi网络
void setup_wifi() {delay(10);Serial.println();Serial.print("WIFI连接到: ");Serial.println(ssid);WiFi.begin(ssid, password);while (WiFi.status() != WL_CONNECTED) {delay(500);Serial.print(".");}randomSeed(micros());Serial.println("");Serial.println("WIFI连接成功");Serial.println("IP地址: ");Serial.println(WiFi.localIP());
}// 回调函数,当接收到订阅主题的消息时调用
void callback(char* topic, byte* payload, unsigned int length) {Serial.print("收到消息 [");Serial.print(topic);Serial.print("] ");// 将接收到的消息转换为字符串String message;for (int i = 0; i < length; i++) {message += (char)payload[i];}Serial.println(message);// 判断主题是否为"switch"if (String(topic) == "switch") {// 如果消息内容为"ON",打开LEDif (message == "ON") {digitalWrite(ledPin, HIGH);Serial.println("LED 已打开");} // 如果消息内容为"OFF",关闭LEDelse if (message == "OFF") {digitalWrite(ledPin, LOW);Serial.println("LED 已关闭");}}
}// 函数用于连接或重新连接到MQTT服务器
void reconnect() {while (!client.connected()) {Serial.println("正在尝试MQTT连接...");// 创建随机客户端IDString clientId = "ESP32Client-";clientId += String(random(0xffff), HEX);// 尝试连接if (client.connect(clientId.c_str())) {Serial.println("已连接到MQTT服务器");// 成功连接后,订阅"switch"主题以控制LEDclient.subscribe("switch");Serial.println("已订阅主题: switch");} else {Serial.print("连接失败, 错误码=");Serial.print(client.state());Serial.println(",5秒后重试");delay(5000);}}
}void setup() { // 初始化串口通信Serial.begin(115200);// 设置引脚模式pinMode(ledPin, OUTPUT);pinMode(tiltSensorPin, INPUT);// 初始状态:LED关闭digitalWrite(ledPin, LOW);// 连接WiFisetup_wifi();// 设置MQTT服务器和回调函数client.setServer(mqtt_server, 1883);client.setCallback(callback);
}void loop() {// 如果与MQTT服务器断开连接,重新连接if (!client.connected()) {reconnect();}// 处理MQTT消息client.loop();// 当前时间unsigned long now = millis();// 每隔指定时间发布一次数据if (now - lastMsg > publishInterval) {lastMsg = now;// 读取光敏传感器数值(模拟值)int lightValue = analogRead(lightSensorPin);// 读取倾斜传感器状态(数字值)int tiltState = digitalRead(tiltSensorPin);// 将光敏传感器数值转换为字符串并发布char lightStr[10];sprintf(lightStr, "%d", lightValue);client.publish("aovalue", lightStr);Serial.print("发布光敏数据: ");Serial.println(lightValue);// 将倾斜传感器状态转换为字符串并发布char tiltStr[2];sprintf(tiltStr, "%d", tiltState);client.publish("dovalue", tiltStr);Serial.print("发布倾斜状态: ");Serial.println(tiltState);}
}