在Android开发中,封装MQTT工具可以帮助简化与MQTT服务器的通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,常用于物联网(IoT)设备之间的通信。
以下是一个简单的MQTT封装工具示例,使用Eclipse Paho库来实现MQTT客户端功能。
- 添加依赖
首先,在build.gradle文件中添加Paho MQTT库的依赖:
dependencies {implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
- 权限配置
在AndroidManifest.xml中添加必要的权限:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
- 服务配置
在AndroidManifest.xml中注册Paho的MqttService:
<service android:name="org.eclipse.paho.android.service.MqttService" />
4.MQTT封装工具类支持:
自定义客户端ID:支持用户自定义客户端ID。
遗言消息:支持设置遗言消息,用于客户端异常断开时通知其他客户端。
线程池管理:使用线程池处理MQTT操作,避免阻塞主线程。
灵活的回调机制:支持为每个操作单独设置回调。
连接重试机制:增加连接失败后的重试机制,提升连接稳定性。
日志级别:支持动态调整日志级别,便于调试和生产环境切换。
支持多服务器连接:允许同时连接多个MQTT服务器。
消息缓存策略:增加消息缓存的最大数量和时间限制,避免内存泄漏。
更灵活的重试机制:支持自定义重试次数和重试间隔。
连接状态管理:增加更详细的连接状态(如连接中、已连接、断开中等)。
支持QoS级别配置:允许动态配置发布和订阅的QoS级别。
MQTT工具类
import android.content.Context;
import android.util.Log;import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MqttManager {private static final String TAG = "MqttManager";private static MqttManager instance;private Map<String, MqttAndroidClient> mqttClients = new HashMap<>(); // 支持多服务器连接private Map<String, MqttConnectOptions> mqttConnectOptionsMap = new HashMap<>();private Map<String, Boolean> connectionStatusMap = new HashMap<>(); // 连接状态private Map<String, List<MqttMessage>> messageQueueMap = new HashMap<>(); // 消息队列private Map<String, List<String>> subscribedTopicsMap = new HashMap<>(); // 已订阅的主题private ExecutorService executorService = Executors.newCachedThreadPool(); // 线程池private ConnectionStateListener connectionStateListener;private static final int DEFAULT_QOS = 1;private static final boolean DEFAULT_RETAINED = false;private static final int MAX_MESSAGE_QUEUE_SIZE = 100; // 消息队列最大数量private static final long MESSAGE_QUEUE_TIMEOUT = 60000; // 消息队列超时时间(毫秒)private static final int MAX_RETRY_COUNT = 3; // 最大重试次数private static final int RETRY_INTERVAL = 5000; // 重试间隔(毫秒)// 单例模式public static synchronized MqttManager getInstance() {if (instance == null) {instance = new MqttManager();}return instance;}private MqttManager() {}/*** 初始化MQTT客户端*/public void initClient(Context context, String serverUri, String clientId) {if (mqttClients.containsKey(serverUri)) {Log.w(TAG, "Client already initialized for server: " + serverUri);return;}MqttAndroidClient client = new MqttAndroidClient(context, serverUri, clientId);mqttClients.put(serverUri, client);mqttConnectOptionsMap.put(serverUri, new MqttConnectOptions());connectionStatusMap.put(serverUri, false);messageQueueMap.put(serverUri, new ArrayList<>());subscribedTopicsMap.put(serverUri, new ArrayList<>());// 设置回调client.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {connectionStatusMap.put(serverURI, true);Log.d(TAG, "MQTT connected: " + serverURI);if (connectionStateListener != null) {connectionStateListener.onConnected(serverURI, reconnect);}// 连接成功后发送缓存的消息processMessageQueue(serverURI);// 重新订阅主题resubscribeTopics(serverURI);}@Overridepublic void connectionLost(Throwable cause) {for (Map.Entry<String, MqttAndroidClient> entry : mqttClients.entrySet()) {if (entry.getValue().equals(client)) {String serverUri = entry.getKey();connectionStatusMap.put(serverUri, false);Log.e(TAG, "MQTT connection lost: " + serverUri, cause);if (connectionStateListener != null) {connectionStateListener.onDisconnected(serverUri, cause);}// 尝试重新连接attemptReconnect(serverUri, 0);break;}}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {Log.d(TAG, "Message arrived: " + new String(message.getPayload()));if (connectionStateListener != null) {connectionStateListener.onMessageReceived(topic, message);}}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {Log.d(TAG, "Message delivered");}});}/*** 设置连接参数*/public void setConnectionOptions(String serverUri, String username, String password) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null) {options.setUserName(username);options.setPassword(password.toCharArray());}}/*** 设置遗言消息*/public void setWillMessage(String serverUri, String topic, String message, int qos, boolean retained) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null) {options.setWill(topic, message.getBytes(), qos, retained);}}/*** 设置SSL/TLS加密连接*/public void setSSL(String serverUri, boolean useSSL) {MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (options != null && useSSL) {options.setSocketFactory(new DefaultSSLSocketFactory());}}/*** 连接到MQTT服务器*/public void connect(String serverUri) {if (connectionStatusMap.get(serverUri)) {Log.w(TAG, "Already connected to MQTT server: " + serverUri);return;}MqttAndroidClient client = mqttClients.get(serverUri);MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);if (client == null || options == null) {Log.e(TAG, "Client or options not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.connect(options, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Connected to MQTT server: " + serverUri);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to connect to MQTT server: " + serverUri, exception);attemptReconnect(serverUri, 0);}});} catch (MqttException e) {Log.e(TAG, "MQTT connection error: " + serverUri, e);attemptReconnect(serverUri, 0);}});}/*** 断开连接*/public void disconnect(String serverUri) {if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Already disconnected from MQTT server: " + serverUri);return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.disconnect(null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {connectionStatusMap.put(serverUri, false);Log.d(TAG, "Disconnected from MQTT server: " + serverUri);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to disconnect from MQTT server: " + serverUri, exception);}});} catch (MqttException e) {Log.e(TAG, "MQTT disconnection error: " + serverUri, e);}});}/*** 订阅主题*/public void subscribe(String serverUri, String topic, int qos, IMqttActionListener listener) {if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Not connected, subscription will be attempted after connection: " + serverUri);subscribedTopicsMap.get(serverUri).add(topic); // 缓存主题return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.subscribe(topic, qos, null, listener);} catch (MqttException e) {Log.e(TAG, "MQTT subscription error: " + serverUri, e);}});}/*** 发布消息*/public void publish(String serverUri, String topic, String message, int qos, boolean retained, IMqttActionListener listener) {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttMessage.setRetained(retained);if (!connectionStatusMap.get(serverUri)) {Log.w(TAG, "Not connected, message will be queued: " + serverUri);List<MqttMessage> queue = messageQueueMap.get(serverUri);if (queue.size() < MAX_MESSAGE_QUEUE_SIZE) {queue.add(mqttMessage); // 缓存消息} else {Log.w(TAG, "Message queue is full, discarding message: " + serverUri);}return;}MqttAndroidClient client = mqttClients.get(serverUri);if (client == null) {Log.e(TAG, "Client not initialized for server: " + serverUri);return;}executorService.execute(() -> {try {client.publish(topic, mqttMessage, null, listener);} catch (MqttException e) {Log.e(TAG, "MQTT publish error: " + serverUri, e);}});}/*** 处理消息队列*/private void processMessageQueue(String serverUri) {List<MqttMessage> queue = messageQueueMap.get(serverUri);for (MqttMessage message : queue) {publish(serverUri, "default/topic", new String(message.getPayload()), message.getQos(), message.isRetained(), null);}queue.clear();}/*** 重新订阅主题*/private void resubscribeTopics(String serverUri) {List<String> topics = subscribedTopicsMap.get(serverUri);for (String topic : topics) {subscribe(serverUri, topic, DEFAULT_QOS, null);}}/*** 尝试重新连接*/private void attemptReconnect(String serverUri, int retryCount) {if (retryCount >= MAX_RETRY_COUNT) {Log.w(TAG, "Max retry count reached for server: " + serverUri);return;}executorService.execute(() -> {try {Thread.sleep(RETRY_INTERVAL);connect(serverUri);} catch (InterruptedException e) {Log.e(TAG, "Reconnect attempt interrupted: " + serverUri, e);}});}/*** 设置连接状态监听器*/public void setConnectionStateListener(ConnectionStateListener listener) {this.connectionStateListener = listener;}/*** 连接状态监听器接口*/public interface ConnectionStateListener {void onConnected(String serverUri, boolean isReconnect);void onDisconnected(String serverUri, Throwable cause);void onMessageReceived(String topic, MqttMessage message);}
}
以下是一个使用 MqttManager 的完整示例。这个示例展示了如何初始化MQTT客户端、连接服务器、订阅主题、发布消息以及处理连接状态和消息回调。
import android.os.Bundle;
import androidx.appcompat.app.AppCompatActivity;import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MainActivity extends AppCompatActivity {private static final String TAG = "MainActivity";private static final String SERVER_URI = "tcp://your.mqtt.broker:1883"; // MQTT服务器地址private static final String CLIENT_ID = "android_client_" + System.currentTimeMillis(); // 客户端IDprivate static final String TOPIC = "your/topic"; // 订阅的主题private static final String WILL_TOPIC = "your/lwt/topic"; // 遗言主题private static final String WILL_MESSAGE = "Client disconnected"; // 遗言消息private MqttManager mqttManager;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);// 初始化MQTT管理器mqttManager = MqttManager.getInstance();mqttManager.initClient(this, SERVER_URI, CLIENT_ID);// 设置连接参数mqttManager.setConnectionOptions(SERVER_URI, "username", "password");// 设置遗言消息mqttManager.setWillMessage(SERVER_URI, WILL_TOPIC, WILL_MESSAGE, 1, true);// 设置连接状态监听器mqttManager.setConnectionStateListener(new MqttManager.ConnectionStateListener() {@Overridepublic void onConnected(String serverUri, boolean isReconnect) {Log.d(TAG, "Connected to MQTT server: " + serverUri + ", isReconnect: " + isReconnect);// 订阅主题mqttManager.subscribe(SERVER_URI, TOPIC, 1, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Subscribed to topic: " + TOPIC);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to subscribe to topic: " + TOPIC, exception);}});}@Overridepublic void onDisconnected(String serverUri, Throwable cause) {Log.e(TAG, "Disconnected from MQTT server: " + serverUri, cause);}@Overridepublic void onMessageReceived(String topic, MqttMessage message) {Log.d(TAG, "Received message from topic: " + topic + ", payload: " + new String(message.getPayload()));}});// 连接MQTT服务器mqttManager.connect(SERVER_URI);// 发布消息publishMessage("Hello, MQTT!");}/*** 发布消息*/private void publishMessage(String message) {mqttManager.publish(SERVER_URI, TOPIC, message, 1, false, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.d(TAG, "Message published to topic: " + TOPIC);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {Log.e(TAG, "Failed to publish message to topic: " + TOPIC, exception);}});}@Overrideprotected void onDestroy() {super.onDestroy();// 断开连接mqttManager.disconnect(SERVER_URI);}
}
示例说明
初始化MQTT客户端:
使用 MqttManager.getInstance() 获取单例实例。
调用 initClient() 初始化MQTT客户端,传入服务器地址和客户端ID。
设置连接参数:
使用 setConnectionOptions() 设置用户名和密码。
设置遗言消息:
使用 setWillMessage() 设置遗言消息,当客户端异常断开时,服务器会发布该消息。
设置连接状态监听器:
实现 ConnectionStateListener 接口,监听连接状态变化和消息到达事件。
连接MQTT服务器:
调用 connect() 连接到MQTT服务器。
订阅主题:
在 onConnected() 回调中订阅主题。
发布消息:
使用 publish() 方法发布消息到指定主题。
断开连接:
在 onDestroy() 中调用 disconnect() 断开连接。
注意事项
服务器地址和主题:
替换 SERVER_URI 和 TOPIC 为实际的MQTT服务器地址和主题。
用户名和密码:
如果MQTT服务器需要认证,请设置正确的用户名和密码。
遗言消息:
遗言消息用于在客户端异常断开时通知其他客户端,确保设置合适的主题和消息内容。
线程安全:
MQTT操作在后台线程中执行,避免阻塞主线程。