欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > vue3中使用mqtt数据传输(封装)

vue3中使用mqtt数据传输(封装)

2024/11/6 14:33:25 来源:https://blog.csdn.net/weixin_41056807/article/details/143425938  浏览:    关键词:vue3中使用mqtt数据传输(封装)

使用版本

"mqtt": "^5.8.0",

安装指令

npm install mqtt --save
------
yarn add mqtt

介绍mqtt

参考使用文档

配置

connection: {protocol: "ws",host: "broker.emqx.io",port: 8083,endpoint: "/mqtt",clean: true,connectTimeout: 30 * 1000, // msreconnectPeriod: 4000, // msclientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),// 随机数 每次不能重复username: "emqx_test",password: "emqx_test",
},

连接

import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)client.on('connect', (e) => {// 订阅主题})

订阅主题

client.subscribe(topic, { qos: 1 }, (err) => {if (!err) {console.log('订阅成功')} else {console.log('消息订阅失败!')}
})

消息发布

给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {if (!err) {console.log('发送成功')client.subscribe(topic, { qos: 1 }, (err) => {if (!err) {console.log('订阅成功')} else {console.log('消息订阅失败!')}})} else {console.log('消息发送失败!')}
})

取消订阅

client.unsubscribe(topicList, (error) => {console.log('主题为' + topicList + '取消订阅成功', error)
})

断开连接

export function unconnect() {client.end()client = null// Message.warning('服务器已断开连接!')console.log('服务器已断开连接!')
}

mqtt封装使用(ts版)

import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';interface ClientOptions extends IClientOptions {clientId: string;
}interface SubscribeOptions {topic: string;callback: (topic: string, message: string) => void;subscribeOption?: mqtt.IClientSubscribeOptions;
}interface PublishOptions {topic: string;message: string;
}class Mqtt {private static instance: Mqtt;private client: MqttClient | undefined;private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};private pendingSubscriptions: SubscribeOptions[] = [];private pendingPublications: PublishOptions[] = [];private isConnected: boolean = false;private constructor(url?: string) {if (url) {this.connect(url);}}public static getInstance(url?: string): Mqtt {if (!Mqtt.instance) {Mqtt.instance = new Mqtt(url);} else if (url && !Mqtt.instance.client) {Mqtt.instance.connect(url);}return Mqtt.instance;}private connect(url: string): void {console.log(url, clientOptions);if (!this.client) {this.client = mqtt.connect(url, clientOptions);this.client.on('connect', this.onConnect);this.client.on('reconnect', this.onReconnect);this.client.on('error', this.onError);this.client.on('message', this.onMessage);}}public disconnect(): void {if (this.client) {this.client.end();this.client = undefined;this.subscribeMembers = {};this.isConnected = false;console.log(`服务器已断开连接!`);}}public subscribe({ topic, callback }: SubscribeOptions): void {if (this.isConnected) {this.client?.subscribe(topic, { qos: 1 }, error => {if (error) {console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);} else {console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);}});this.subscribeMembers[topic] = callback;} else {this.pendingSubscriptions.push({ topic, callback });}}public unsubscribe(topic: string): void {if (!this.client) {return;}this.client.unsubscribe(topic, error => {if (error) {console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);} else {console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);}});this.subscribeMembers[topic] = undefined;}public publish({ topic, message }: PublishOptions): void {if (this.isConnected) {this.client?.publish(topic, message, { qos: 1 }, e => {if (e) {console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);}});} else {this.pendingPublications.push({ topic, message });}}private onConnect = (e: any): void => {console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);this.isConnected = true;this.processPendingSubscriptions();this.processPendingPublications();};private onReconnect = (): void => {console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);this.isConnected = false;};private onError = (error: Error): void => {console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);this.isConnected = false;};private onMessage = (topic: string, message: Buffer): void => {// console.log(//   `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,//   message.toString(),// );const callback = this.subscribeMembers?.[topic];callback?.(topic, message.toString());};private processPendingSubscriptions(): void {while (this.pendingSubscriptions.length > 0) {const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;this.subscribe({ topic, callback, subscribeOption });}}private processPendingPublications(): void {while (this.pendingPublications.length > 0) {const { topic, message } = this.pendingPublications.shift()!;this.publish({ topic, message });}}
}const clientOptions: ClientOptions = {clean: true,connectTimeout: 500,protocolVersion: 5,rejectUnauthorized: false,username: 'admin',password: 'Anjian-emqx',clientId: `client-${Date.now()}`
};// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);

注意:

  1. 环境配置
    .env.test
VITE_OTHER_SERVICE_BASE_URL= `{"mqtt": "ws://192.168.11.14:8083/mqtt"
}`
  1. qos设置 前后端统一为1
    在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com