消息重复消费的原因
- 生产者重试:网络波动导致生产者未收到 Broker 确认,重复发送消息。
- 消费者失败:消费者处理消息后未发送 ACK,消息重新入队。
- 集群故障转移:主节点宕机,未确认消息被重新投递。
解决方案
1. 消费者幂等性设计
原理:确保同一消息多次处理的结果与一次处理相同。
实现方式
-
数据库唯一约束
利用业务字段(如订单号)的唯一性约束,避免重复插入数据。CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 唯一订单号amount DECIMAL(10,2) );
// Java 示例(使用 MyBatis) public void processOrder(Order order) {try {orderMapper.insert(order); // 唯一约束冲突时会抛出异常// 业务逻辑...} catch (DuplicateKeyException e) {// 已处理过该订单,直接跳过log.warn("订单已存在: {}", order.getId());} }
-
Redis 原子操作
使用 Redis 记录已处理消息的 ID,通过SETNX
命令实现原子性检查。// Java 示例(使用 Spring Data Redis) public boolean isMessageProcessed(String messageId) {Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));return Boolean.TRUE.equals(result); }public void consumeMessage(Message message) {String messageId = message.getMessageId();if (!isMessageProcessed(messageId)) {// 已处理过,直接返回return;}// 业务逻辑... }
2. 消息全局唯一 ID
原理:为每条消息分配唯一 ID,消费者记录已处理 ID。
实现步骤
-
生产者端:发送消息时附加唯一 ID。
// Java 示例(使用 RabbitTemplate) public void sendOrder(Order order) {String messageId = UUID.randomUUID().toString();Message message = MessageBuilder.withBody(order.toJson().getBytes()).setHeader("messageId", messageId).build();rabbitTemplate.send("order.exchange", "order.key", message); }
-
消费者端:处理前检查 ID 是否已存在。
// Java 示例(使用 @RabbitListener) @RabbitListener(queues = "order.queue") public void handleOrder(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (redisTemplate.hasKey("processed:" + messageId)) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}// 业务逻辑...redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));channel.basicAck(deliveryTag, false); }
3. 手动确认模式(Manual ACK)
原理:消费者处理完消息后手动发送 ACK,避免消息因异常重新入队。
配置与代码
-
配置手动 ACK(Spring Boot):
spring:rabbitmq:listener:simple:acknowledge-mode: manual
-
消费者逻辑:
@RabbitListener(queues = "order.queue") public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 业务逻辑...channel.basicAck(deliveryTag, false); // 确认消息} catch (Exception e) {channel.basicNack(deliveryTag, false, true); // 重入队列} }
4. 消息去重表
原理:在数据库中维护一张去重表,记录已处理的消息 ID。
表结构
CREATE TABLE message_dedup (message_id VARCHAR(128) PRIMARY KEY,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消费者逻辑
public void consumeMessage(Message message) {String messageId = extractMessageId(message);try {jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);// 业务逻辑...} catch (DuplicateKeyException e) {// 消息已处理,直接ACKchannel.basicAck(deliveryTag, false);}
}
5. 消息过期与死信队列
原理:设置消息 TTL,超时未处理则转入死信队列,避免无限重试。
配置队列 TTL 和死信交换
// Java 配置示例
@Bean
public Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 消息60秒过期args.put("x-dead-letter-exchange", "dlx.exchange");args.put("x-dead-letter-routing-key", "dlx.key");return new Queue("order.queue", true, false, false, args);
}@Bean
public DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange");
}@Bean
public Queue dlxQueue() {return new Queue("dlx.queue");
}@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}
方案对比与选型
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
数据库唯一约束 | 无需额外组件 | 高并发下数据库压力大 | 低频业务(如订单创建) |
Redis 原子操作 | 高性能 | 需维护 Redis 高可用 | 高频业务(如支付回调) |
手动ACK | 避免消息丢失 | 需处理ACK异常 | 所有需要可靠消费的场景 |
消息去重表 | 数据持久化 | 增加数据库写入压力 | 数据一致性要求高的场景 |
死信队列 | 避免消息堆积 | 需额外处理死信消息 | 需要异常消息兜底的场景 |
总结
- 幂等性设计是核心:无论消息重复多少次,业务结果保持一致。
- 组合使用多种方案:例如“手动ACK + Redis去重”兼顾可靠性与性能。
- 监控与告警:通过 RabbitMQ 管理界面监控消息积压情况,设置阈值告警。