使用版本
"mqtt": "^5.8.0",
安装指令
npm install mqtt --save
------
yarn add mqtt
介绍mqtt
参考使用文档
配置
connection: {protocol: "ws",host: "broker.emqx.io",port: 8083,endpoint: "/mqtt",clean: true,connectTimeout: 30 * 1000, // msreconnectPeriod: 4000, // msclientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),// 随机数 每次不能重复username: "emqx_test",password: "emqx_test",
},
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)client.on('connect', (e) => {// 订阅主题})
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {if (!err) {console.log('订阅成功')} else {console.log('消息订阅失败!')}
})
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。
client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {if (!err) {console.log('发送成功')client.subscribe(topic, { qos: 1 }, (err) => {if (!err) {console.log('订阅成功')} else {console.log('消息订阅失败!')}})} else {console.log('消息发送失败!')}
})
取消订阅
client.unsubscribe(topicList, (error) => {console.log('主题为' + topicList + '取消订阅成功', error)
})
断开连接
export function unconnect() {client.end()client = null// Message.warning('服务器已断开连接!')console.log('服务器已断开连接!')
}
mqtt封装使用(ts版)
import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';interface ClientOptions extends IClientOptions {clientId: string;
}interface SubscribeOptions {topic: string;callback: (topic: string, message: string) => void;subscribeOption?: mqtt.IClientSubscribeOptions;
}interface PublishOptions {topic: string;message: string;
}class Mqtt {private static instance: Mqtt;private client: MqttClient | undefined;private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};private pendingSubscriptions: SubscribeOptions[] = [];private pendingPublications: PublishOptions[] = [];private isConnected: boolean = false;private constructor(url?: string) {if (url) {this.connect(url);}}public static getInstance(url?: string): Mqtt {if (!Mqtt.instance) {Mqtt.instance = new Mqtt(url);} else if (url && !Mqtt.instance.client) {Mqtt.instance.connect(url);}return Mqtt.instance;}private connect(url: string): void {console.log(url, clientOptions);if (!this.client) {this.client = mqtt.connect(url, clientOptions);this.client.on('connect', this.onConnect);this.client.on('reconnect', this.onReconnect);this.client.on('error', this.onError);this.client.on('message', this.onMessage);}}public disconnect(): void {if (this.client) {this.client.end();this.client = undefined;this.subscribeMembers = {};this.isConnected = false;console.log(`服务器已断开连接!`);}}public subscribe({ topic, callback }: SubscribeOptions): void {if (this.isConnected) {this.client?.subscribe(topic, { qos: 1 }, error => {if (error) {console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);} else {console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);}});this.subscribeMembers[topic] = callback;} else {this.pendingSubscriptions.push({ topic, callback });}}public unsubscribe(topic: string): void {if (!this.client) {return;}this.client.unsubscribe(topic, error => {if (error) {console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);} else {console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);}});this.subscribeMembers[topic] = undefined;}public publish({ topic, message }: PublishOptions): void {if (this.isConnected) {this.client?.publish(topic, message, { qos: 1 }, e => {if (e) {console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);}});} else {this.pendingPublications.push({ topic, message });}}private onConnect = (e: any): void => {console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);this.isConnected = true;this.processPendingSubscriptions();this.processPendingPublications();};private onReconnect = (): void => {console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);this.isConnected = false;};private onError = (error: Error): void => {console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);this.isConnected = false;};private onMessage = (topic: string, message: Buffer): void => {// console.log(// `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,// message.toString(),// );const callback = this.subscribeMembers?.[topic];callback?.(topic, message.toString());};private processPendingSubscriptions(): void {while (this.pendingSubscriptions.length > 0) {const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;this.subscribe({ topic, callback, subscribeOption });}}private processPendingPublications(): void {while (this.pendingPublications.length > 0) {const { topic, message } = this.pendingPublications.shift()!;this.publish({ topic, message });}}
}const clientOptions: ClientOptions = {clean: true,connectTimeout: 500,protocolVersion: 5,rejectUnauthorized: false,username: 'admin',password: 'Anjian-emqx',clientId: `client-${Date.now()}`
};// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);
注意:
- 环境配置
.env.test
VITE_OTHER_SERVICE_BASE_URL= `{"mqtt": "ws://192.168.11.14:8083/mqtt"
}`
qos
设置 前后端统一为1