欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 金融 > 阿里云IOT消息处理

阿里云IOT消息处理

2025/2/23 17:03:53 来源:https://blog.csdn.net/m0_64972409/article/details/145606929  浏览:    关键词:阿里云IOT消息处理

        文章主要讲述了阿里云IOT平台如何处理设备上报的消息、如何将消息路由到不同的处理逻辑、如何进行消息转发与转换等操作。

 

一、接收IOT消息

1.创建订阅

2.案列代码

官网案例代码:如何将AMQP JMS客户端接入物联网平台接收消息_物联网平台(IoT)-阿里云帮助中心

代码详情:

package com.aliyun.iotx.demo;import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class AmqpClient {private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);/*** 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考*/private static String accessKey = "LTAI5tDQKg";private static String accessSecret = "LYUKZ";;private static String consumerGroupId = "eraic";//iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。private static String iotInstanceId = "iot-00000frq8umvkx2";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。private static String clientId;static {try {clientId = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {e.printStackTrace();}}//${YourHost}为接入域名,请参见AMQP客户端接入说明文档。private static String host = "iot-00000frq8umvkx2.amqp.iothub.aliyuncs.com";// 指定单个进程启动的连接数// 单个连接消费速率有限,请参考使用限制,最大64个连接// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接private static int connectionCount = 4;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue(50000));public static void main(String[] args) throws Exception {List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。for (int i = 0; i < connectionCount; i++) {long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5、hmacsha1和hmacsha256。String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。String userName = clientId + "-" + i + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";//计算签名,password组装方法,请参见AMQP客户端接入说明文档。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent, accessSecret, signMethod);String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF", connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// 创建连接。Connection connection = cf.createConnection(userName, password);connections.add(connection);((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}logger.info("amqp demo is started successfully, and will exit after 60s ");// 结束程序运行 Thread.sleep(6000 * 1000);logger.info("run shutdown");connections.forEach(c-> {try {c.close();} catch (JMSException e) {logger.error("failed to close connection", e);}});executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("shutdown success");} else {logger.info("failed to handle messages");}}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(final Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(new Runnable() {@Overridepublic void run() {processMessage(message);}});} catch (Exception e) {logger.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ",\n topic = " + topic+ ",\n messageId = " + messageId+ ",\n content = " + content);} catch (Exception e) {logger.error("processMessage occurs error ", e);}}private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}
}

3.接收数据

二、SDK改造业务

如果想在项目启动时就连接阿里平台接受物联网数据,需要去实现ApplicationRunner接口,并重写run方法

1.线程池定义

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
public class ThreadPoolConfig {/*** 核心线程池大小*/private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();/*** 最大可创建的线程数*/private static final int MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;/*** 队列最大长度*/private static final int QUEUE_CAPACITY = 50000;/*** 线程池维护线程所允许的空闲时间*/private static final int KEEP_ALIVE_SECONDS = 60;@Beanpublic ExecutorService executorService(){AtomicInteger c = new AtomicInteger(1);LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);return new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_SECONDS,TimeUnit.MILLISECONDS,queue,r -> new Thread(r, "sdk-pool-" + c.getAndIncrement()),new ThreadPoolExecutor.DiscardPolicy());}
}

2.AmqpClient

package com.zzyl.job;import com.zzyl.properties.AliIoTConfigProperties;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;@Component
public class AmqpClient implements ApplicationRunner {private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);@Autowiredprivate AliIoTConfigProperties aliIoTConfigProperties;//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。private static String clientId;static {try {clientId = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {e.printStackTrace();}}// 指定单个进程启动的连接数// 单个连接消费速率有限,请参考使用限制,最大64个连接// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接private static int connectionCount = 64;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。@Autowiredprivate ExecutorService executorService;public void start() throws Exception {List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。for (int i = 0; i < connectionCount; i++) {long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5、hmacsha1和hmacsha256。String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。String userName = clientId + "-" + i + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + aliIoTConfigProperties.getAccessKeyId()+ ",iotInstanceId=" + aliIoTConfigProperties.getIotInstanceId()+ ",consumerGroupId=" + aliIoTConfigProperties.getConsumerGroupId()+ "|";//计算签名,password组装方法,请参见AMQP客户端接入说明文档。String signContent = "authId=" + aliIoTConfigProperties.getAccessKeyId() + "&timestamp=" + timeStamp;String password = doSign(signContent, aliIoTConfigProperties.getAccessKeySecret(), signMethod);String connectionUrl = "failover:(amqps://" + aliIoTConfigProperties.getHost() + ":5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF", connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");Destination queue = (Destination) context.lookup("QUEUE");// 创建连接。Connection connection = cf.createConnection(userName, password);connections.add(connection);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}logger.info("amqp  is started successfully, and will exit after server shutdown ");}private MessageListener messageListener = message -> {try {//异步处理收到的消息,确保onMessage函数里没有耗时逻辑executorService.submit(() -> processMessage(message));} catch (Exception e) {logger.error("submit task occurs exception ", e);}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String contentStr = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ",\n topic = " + topic+ ",\n messageId = " + messageId+ ",\n content = " + contentStr);} catch (Exception e) {logger.error("processMessage occurs error ", e);}}private JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}@Overridepublic void run(ApplicationArguments args) throws Exception {start();}
}

3.数据存储

@Data
public class Content {/*** 设备类型*/private String deviceType;/*** 设备ID*/private String iotId;/*** 请求ID*/private long requestId;private Map<String, Object> checkFailedData;/*** 产品key*/private String productKey;private long gmtCreate;private String deviceName;private Map<String, Item> items;public class Item {private int value;private long time;public int getValue() {return value;}public void setValue(int value) {this.value = value;}public long getTime() {return time;}public void setTime(long time) {this.time = time;}}
}
private void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ",\n topic = " + topic+ ",\n messageId = " + messageId+ ",\n content = " + content);//***数据持久化Content contentObject = JSON.parseObject(content, Content.class);//依据设备id去数据库查询设备信息List<DeviceVo> deviceVoList = deviceMapper.selectByDeviceIds(Collections.singletonList(contentObject.getIotId()));if (ObjectUtil.isEmpty(deviceVoList)) {logger.error("设备不存在" + contentObject.getIotId());return;}//遍历存储设备的所有不同数据contentObject.getItems().forEach((key, value) -> {DeviceVo device = deviceVoList.get(0);DeviceData build = DeviceData.builder().alarmTime(LocalDateTimeUtil.of(value.getTime())).deviceName(contentObject.getDeviceName()).iotId(contentObject.getIotId()).productId(contentObject.getProductKey()).functionName(key).dataValue(value.getValue() + "").noteName(device.getNickname()).productName(device.getProductName()).accessLocation(device.getRemark())
//                        .status(status + "").build();int insert = deviceDataMapper.insert(build);if (insert > 0) {logger.info(contentObject.getDeviceName() + "设备" + key + "数据存储成功");}});} catch (Exception e) {logger.error("processMessage occurs error ", e);}}


版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词