欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > RabbitMQ重复消费如何解决

RabbitMQ重复消费如何解决

2025/3/13 7:00:48 来源:https://blog.csdn.net/qq_37205597/article/details/146212357  浏览:    关键词:RabbitMQ重复消费如何解决

消息重复消费的原因

  1. 生产者重试:网络波动导致生产者未收到 Broker 确认,重复发送消息。
  2. 消费者失败:消费者处理消息后未发送 ACK,消息重新入队。
  3. 集群故障转移:主节点宕机,未确认消息被重新投递。

解决方案

1. 消费者幂等性设计

原理:确保同一消息多次处理的结果与一次处理相同。

实现方式
  • 数据库唯一约束
    利用业务字段(如订单号)的唯一性约束,避免重复插入数据。

    CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 唯一订单号amount DECIMAL(10,2)
    );
    
    // Java 示例(使用 MyBatis)
    public void processOrder(Order order) {try {orderMapper.insert(order); // 唯一约束冲突时会抛出异常// 业务逻辑...} catch (DuplicateKeyException e) {// 已处理过该订单,直接跳过log.warn("订单已存在: {}", order.getId());}
    }
    
  • Redis 原子操作
    使用 Redis 记录已处理消息的 ID,通过 SETNX 命令实现原子性检查。

    // Java 示例(使用 Spring Data Redis)
    public boolean isMessageProcessed(String messageId) {Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));return Boolean.TRUE.equals(result);
    }public void consumeMessage(Message message) {String messageId = message.getMessageId();if (!isMessageProcessed(messageId)) {// 已处理过,直接返回return;}// 业务逻辑...
    }
    

2. 消息全局唯一 ID

原理:为每条消息分配唯一 ID,消费者记录已处理 ID。

实现步骤
  1. 生产者端:发送消息时附加唯一 ID。

    // Java 示例(使用 RabbitTemplate)
    public void sendOrder(Order order) {String messageId = UUID.randomUUID().toString();Message message = MessageBuilder.withBody(order.toJson().getBytes()).setHeader("messageId", messageId).build();rabbitTemplate.send("order.exchange", "order.key", message);
    }
    
  2. 消费者端:处理前检查 ID 是否已存在。

    // Java 示例(使用 @RabbitListener)
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (redisTemplate.hasKey("processed:" + messageId)) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}// 业务逻辑...redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));channel.basicAck(deliveryTag, false);
    }
    

3. 手动确认模式(Manual ACK)

原理:消费者处理完消息后手动发送 ACK,避免消息因异常重新入队。

配置与代码
  1. 配置手动 ACK(Spring Boot):

    spring:rabbitmq:listener:simple:acknowledge-mode: manual
    
  2. 消费者逻辑

    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 业务逻辑...channel.basicAck(deliveryTag, false); // 确认消息} catch (Exception e) {channel.basicNack(deliveryTag, false, true); // 重入队列}
    }
    

4. 消息去重表

原理:在数据库中维护一张去重表,记录已处理的消息 ID。

表结构
CREATE TABLE message_dedup (message_id VARCHAR(128) PRIMARY KEY,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消费者逻辑
public void consumeMessage(Message message) {String messageId = extractMessageId(message);try {jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);// 业务逻辑...} catch (DuplicateKeyException e) {// 消息已处理,直接ACKchannel.basicAck(deliveryTag, false);}
}

5. 消息过期与死信队列

原理:设置消息 TTL,超时未处理则转入死信队列,避免无限重试。

配置队列 TTL 和死信交换
// Java 配置示例
@Bean
public Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 消息60秒过期args.put("x-dead-letter-exchange", "dlx.exchange");args.put("x-dead-letter-routing-key", "dlx.key");return new Queue("order.queue", true, false, false, args);
}@Bean
public DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange");
}@Bean
public Queue dlxQueue() {return new Queue("dlx.queue");
}@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}

方案对比与选型

方案优点缺点适用场景
数据库唯一约束无需额外组件高并发下数据库压力大低频业务(如订单创建)
Redis 原子操作高性能需维护 Redis 高可用高频业务(如支付回调)
手动ACK避免消息丢失需处理ACK异常所有需要可靠消费的场景
消息去重表数据持久化增加数据库写入压力数据一致性要求高的场景
死信队列避免消息堆积需额外处理死信消息需要异常消息兜底的场景

总结

  • 幂等性设计是核心:无论消息重复多少次,业务结果保持一致。
  • 组合使用多种方案:例如“手动ACK + Redis去重”兼顾可靠性与性能。
  • 监控与告警:通过 RabbitMQ 管理界面监控消息积压情况,设置阈值告警。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com