问题
今天遇到一个问题,发送MQ
消息的时候需要保证不会重复发送,注意不是可靠到达(可靠到达可以通过消息确认机制和回调接口保证),这里保证的是不会生产多条一样的消息。
方法
综合讨论下来决定使用Redis
缓存来解决,因为相比于将记录插入数据库Redis
更为高效和便捷。
检验是否已经发送
在发送消息之前根据相关信息组合成key
去Redis
中查找,找到后检测值是否为存在并且是否为设定的值,若存在且与设定的值一样,则返回false
,说明该消息已经发送过了。
public boolean isSend(String messageType, Long bizId, int hashCode) {// 根据消息类型、业务id和哈希值组合成keyString key = this.genKey(messageType, bizId, hashCode);Long value = super.get(key);if (value != null && value.equals(DEFAULT_VALUE)) {return false;}return true;}/**get方法*/public V get(K key) {if (key == null) {return null;} else {try {// 在key前添加前缀和名字,并将原来的key进行json序列化String realKey = this.genRealKey(key);String content = (String)this.redisTemplate.opsForValue().get(realKey);// 若get到的值不为null则进行json反序列化return content == null ? null : this.valueSerializer.deserialize(content);} catch (Exception e) {CACHE.error("", key.toString(), "", "0", e);return null;}}}
以上就是检验消息是否重复的方法,需要注意的是JSON
序列化,因为Redis默认使用的是JDK
序列化,这种序列化后的内容不仅多而且不易于阅读,因此将其改为Json序列化。
发送后添加缓存
在发送消息的时候会先在Redis
中put
一个以相关信息组合为key
,value
为默认值的记录,过期时间为5min
。
public void sendMessage(String messageType, Long bizId, int hashCode) {super.put(genKey(messageType, bizId, hashCode), DEFAULT_VALUE);}/**put方法*/public void put(K key, V value) {try {if (key != null && null != value) {// 进行json序列化String content = this.valueSerializer.serialize(value);this.redisTemplate.opsForValue().set(this.genRealKey(key), content, this.expires, this.timeUnit);}} catch (Throwable e) {e.printStackTrace();}}
发送消息方法
最后的发送消息方法大致代码如下:
public void sendMQMessage(Long bizId, String messageTemplateCode, String msg, int msgHashCode, String exchange, String routingKey) {//加入缓存boolean send = true;//String messageType = MessageTypeUtil.getMessageType(messageTemplateCode);if (bizId != null) {// 检测是否已经发送send = sendMessageCache.isSend(messageTemplateCode, bizId, msgHashCode);}//发送mq消息if (send) {if (bizId != null) {// 加入缓存sendMessageCache.sendMessage(messageTemplateCode, bizId, msgHashCode);}// 发送消息messageSender.send(exchange, routingKey, msg);}}