欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > 高效Android MQTT封装工具:简化物联网开发,提升性能与稳定性

高效Android MQTT封装工具:简化物联网开发,提升性能与稳定性

2025/3/10 16:36:12 来源:https://blog.csdn.net/tangweiguo03051987/article/details/146122643  浏览:    关键词:高效Android MQTT封装工具:简化物联网开发,提升性能与稳定性

在Android开发中,封装MQTT工具可以帮助简化与MQTT服务器的通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,常用于物联网(IoT)设备之间的通信。

以下是一个简单的MQTT封装工具示例,使用Eclipse Paho库来实现MQTT客户端功能。

  1. 添加依赖
    首先,在build.gradle文件中添加Paho MQTT库的依赖:
dependencies {implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
  1. 权限配置
    在AndroidManifest.xml中添加必要的权限:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
  1. 服务配置
    在AndroidManifest.xml中注册Paho的MqttService:
<service android:name="org.eclipse.paho.android.service.MqttService" />

4.MQTT封装工具类支持:
自定义客户端ID:支持用户自定义客户端ID。

遗言消息:支持设置遗言消息,用于客户端异常断开时通知其他客户端。

线程池管理:使用线程池处理MQTT操作,避免阻塞主线程。

灵活的回调机制:支持为每个操作单独设置回调。

连接重试机制:增加连接失败后的重试机制,提升连接稳定性。

日志级别:支持动态调整日志级别,便于调试和生产环境切换。
支持多服务器连接:允许同时连接多个MQTT服务器。

消息缓存策略:增加消息缓存的最大数量和时间限制,避免内存泄漏。

更灵活的重试机制:支持自定义重试次数和重试间隔。

连接状态管理:增加更详细的连接状态(如连接中、已连接、断开中等)。

支持QoS级别配置:允许动态配置发布和订阅的QoS级别。

MQTT工具类

import android.content.Context;
import android.util.Log;import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MqttManager {private static final String TAG = "MqttManager";private static MqttManager instance;private Map<String, MqttAndroidClient> mqttClients = new HashMap<>(); // 支持多服务器连接private Map<String, MqttConnectOptions> mqttConnectOptionsMap = new HashMap<>();private Map<String, Boolean> connectionStatusMap = new HashMap<>(); // 连接状态private Map<String, List<MqttMessage>> messageQueueMap = new HashMap<>(); // 消息队列private Map<String, List<String>> subscribedTopicsMap = new HashMap<>(); // 已订阅的主题private ExecutorService executorService = Executors.newCachedThreadPool(); // 线程池private ConnectionStateListener connectionStateListener;private static final int DEFAULT_QOS = 1;private static final boolean DEFAULT_RETAINED = false;private static final int MAX_MESSAGE_QUEUE_SIZE = 100; // 消息队列最大数量private static final long MESSAGE_QUEUE_TIMEOUT = 60000; // 消息队列超时时间(毫秒)private static final int MAX_RETRY_COUNT = 3; // 最大重试次数private static final int RETRY_INTERVAL = 5000; // 重试间隔(毫秒)// 单例模式public static synchronized MqttManager getInstance() {if (instance == null) {instance = new MqttManager();}return instance;}private MqttManager() {}/*** 初始化MQTT客户端*/public void initClient(Context context, String serverUri, String clientId) {if (mqttClients.containsKey(serverUri)) {Log.w(TAG, "Client already initialized for server: " + serverUri);return;}MqttAndroidClient client = new MqttAndroidClient(context, serverUri, clientId);mqttClients.put(serverUri, client);mqttConnectOptionsMap.put(serverUri, new MqttConnectOptions());connectionStatusMap.put(serverUri, false);messageQueueMap.put(serverUri, new ArrayList<>());subscribedTopicsMap.put(serverUri, new ArrayList<>());// 设置回调client.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {connectionStatusMap.put(serverURI, true);Log.d(TAG, "MQTT connected: " + serverURI);if (connectionStateListener != null) {connectionStateListener.onConnected(serverURI, reconnect);}// 连接成功后发送缓存的消息processMessageQueue(serverURI);// 重新订阅主题resubscribeTopics(serverURI);}@Overridepublic void connectionLost(Throwable cause) {for (Map.Entry<String, MqttAndroidClient> entry : mqttClients.entrySet()) {if (entry.getValue().equals(client)) {String serverUri = entry.getKey();connectionStatusMap.put(serverUri, false);Log.e(TAG, "MQTT connection lost: " + serverUri, cause);if (connectionStateListener != null) {connectionStateListener.onDisconnected(serverUri, cause);}// 尝试重新连接attemptReconnect(serverUri, 0);break;}}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {Log.d(TAG, "Message arrived: " + new String(message.getPayload()));if (connectionStateListener != null) {connectionStateListener.onMessageReceived(topic, message);}}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {Log.d(TAG, "Message delivered");}});}/*** 设置连接参数*/public void setConnectionOptions(String serverUri, String username, String password) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null) {options.setUserName(username);options.setPassword(password.toCharArray());}}/*** 设置遗言消息*/public void setWillMessage(String serverUri, String topic, String message, int qos, boolean retained) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null) {options.setWill(topic, message.getBytes(), qos, retained);}}/*** 设置SSL/TLS加密连接*/public void setSSL(String serverUri, boolean useSSL) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null && useSSL) {options.setSocketFactory(new DefaultSSLSocketFactory());}}/*** 连接到MQTT服务器*/public void connect(String serverUri) {if (connectionStatusMap.get(serverUri)) {Log.w(TAG, "Already connected to MQTT server: " + serverUri);return;}MqttAndroidClient client = mqttClients.get(serverUri);MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (client == null || options == null) {Log.e(TAG, "Client or options not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.connect(options, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Connected to MQTT server: " + serverUri);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to connect to MQTT server: " + serverUri, exception);attemptReconnect(serverUri, 0);}});} catch (MqttException e) {Log.e(TAG, "MQTT connection error: " + serverUri, e);attemptReconnect(serverUri, 0);}});}/*** 断开连接*/public void disconnect(String serverUri) {if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Already disconnected from MQTT server: " + serverUri);return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.disconnect(null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {connectionStatusMap.put(serverUri, false);Log.d(TAG, "Disconnected from MQTT server: " + serverUri);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to disconnect from MQTT server: " + serverUri, exception);}});} catch (MqttException e) {Log.e(TAG, "MQTT disconnection error: " + serverUri, e);}});}/*** 订阅主题*/public void subscribe(String serverUri, String topic, int qos, IMqttActionListener listener) {if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Not connected, subscription will be attempted after connection: " + serverUri);subscribedTopicsMap.get(serverUri).add(topic); // 缓存主题return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.subscribe(topic, qos, null, listener);} catch (MqttException e) {Log.e(TAG, "MQTT subscription error: " + serverUri, e);}});}/*** 发布消息*/public void publish(String serverUri, String topic, String message, int qos, boolean retained, IMqttActionListener listener) {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttMessage.setRetained(retained);if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Not connected, message will be queued: " + serverUri);List<MqttMessage> queue = messageQueueMap.get(serverUri);if (queue.size() < MAX_MESSAGE_QUEUE_SIZE) {queue.add(mqttMessage); // 缓存消息} else {Log.w(TAG, "Message queue is full, discarding message: " + serverUri);}return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.publish(topic, mqttMessage, null, listener);} catch (MqttException e) {Log.e(TAG, "MQTT publish error: " + serverUri, e);}});}/*** 处理消息队列*/private void processMessageQueue(String serverUri) {List<MqttMessage> queue = messageQueueMap.get(serverUri);for (MqttMessage message : queue) {publish(serverUri, "default/topic", new String(message.getPayload()), message.getQos(), message.isRetained(), null);}queue.clear();}/*** 重新订阅主题*/private void resubscribeTopics(String serverUri) {List<String> topics = subscribedTopicsMap.get(serverUri);for (String topic : topics) {subscribe(serverUri, topic, DEFAULT_QOS, null);}}/*** 尝试重新连接*/private void attemptReconnect(String serverUri, int retryCount) {if (retryCount >= MAX_RETRY_COUNT) {Log.w(TAG, "Max retry count reached for server: " + serverUri);return;}executorService.execute(() -> {try {Thread.sleep(RETRY_INTERVAL);connect(serverUri);} catch (InterruptedException e) {Log.e(TAG, "Reconnect attempt interrupted: " + serverUri, e);}});}/*** 设置连接状态监听器*/public void setConnectionStateListener(ConnectionStateListener listener) {this.connectionStateListener = listener;}/*** 连接状态监听器接口*/public interface ConnectionStateListener {void onConnected(String serverUri, boolean isReconnect);void onDisconnected(String serverUri, Throwable cause);void onMessageReceived(String topic, MqttMessage message);}
}

以下是一个使用 MqttManager 的完整示例。这个示例展示了如何初始化MQTT客户端、连接服务器、订阅主题、发布消息以及处理连接状态和消息回调。

import android.os.Bundle;
import androidx.appcompat.app.AppCompatActivity;import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MainActivity extends AppCompatActivity {private static final String TAG = "MainActivity";private static final String SERVER_URI = "tcp://your.mqtt.broker:1883"; // MQTT服务器地址private static final String CLIENT_ID = "android_client_" + System.currentTimeMillis(); // 客户端IDprivate static final String TOPIC = "your/topic"; // 订阅的主题private static final String WILL_TOPIC = "your/lwt/topic"; // 遗言主题private static final String WILL_MESSAGE = "Client disconnected"; // 遗言消息private MqttManager mqttManager;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);// 初始化MQTT管理器mqttManager = MqttManager.getInstance();mqttManager.initClient(this, SERVER_URI, CLIENT_ID);// 设置连接参数mqttManager.setConnectionOptions(SERVER_URI, "username", "password");// 设置遗言消息mqttManager.setWillMessage(SERVER_URI, WILL_TOPIC, WILL_MESSAGE, 1, true);// 设置连接状态监听器mqttManager.setConnectionStateListener(new MqttManager.ConnectionStateListener() {@Overridepublic void onConnected(String serverUri, boolean isReconnect) {Log.d(TAG, "Connected to MQTT server: " + serverUri + ", isReconnect: " + isReconnect);// 订阅主题mqttManager.subscribe(SERVER_URI, TOPIC, 1, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Subscribed to topic: " + TOPIC);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to subscribe to topic: " + TOPIC, exception);}});}@Overridepublic void onDisconnected(String serverUri, Throwable cause) {Log.e(TAG, "Disconnected from MQTT server: " + serverUri, cause);}@Overridepublic void onMessageReceived(String topic, MqttMessage message) {Log.d(TAG, "Received message from topic: " + topic + ", payload: " + new String(message.getPayload()));}});// 连接MQTT服务器mqttManager.connect(SERVER_URI);// 发布消息publishMessage("Hello, MQTT!");}/*** 发布消息*/private void publishMessage(String message) {mqttManager.publish(SERVER_URI, TOPIC, message, 1, false, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Message published to topic: " + TOPIC);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to publish message to topic: " + TOPIC, exception);}});}@Overrideprotected void onDestroy() {super.onDestroy();// 断开连接mqttManager.disconnect(SERVER_URI);}
}

示例说明
初始化MQTT客户端:

使用 MqttManager.getInstance() 获取单例实例。

调用 initClient() 初始化MQTT客户端,传入服务器地址和客户端ID。

设置连接参数:

使用 setConnectionOptions() 设置用户名和密码。

设置遗言消息:

使用 setWillMessage() 设置遗言消息,当客户端异常断开时,服务器会发布该消息。

设置连接状态监听器:

实现 ConnectionStateListener 接口,监听连接状态变化和消息到达事件。

连接MQTT服务器:

调用 connect() 连接到MQTT服务器。

订阅主题:

在 onConnected() 回调中订阅主题。

发布消息:

使用 publish() 方法发布消息到指定主题。

断开连接:

在 onDestroy() 中调用 disconnect() 断开连接。

注意事项
服务器地址和主题:

替换 SERVER_URI 和 TOPIC 为实际的MQTT服务器地址和主题。

用户名和密码:

如果MQTT服务器需要认证,请设置正确的用户名和密码。

遗言消息:

遗言消息用于在客户端异常断开时通知其他客户端,确保设置合适的主题和消息内容。

线程安全:

MQTT操作在后台线程中执行,避免阻塞主线程。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词