目录
如何保证消息不丢失
如何保证消息顺序消费
如何实现延时消息
如何保证消息不丢失
-
消息存储:
- RocketMQ 将消息存储在物理磁盘上,而不是仅仅缓存在内存中。每个消息都被序列化并存储在指定的存储路径中。
-
主从复制(Master-Slave Replication):
- 消息在被发送到 Master 节点后,会被异步复制到 Slave 节点。这样即使 Master 节点发生故障,Slave 节点仍然可以保证消息的完整性。
-
刷盘机制:
- 为了确保消息的持久性,RocketMQ 支持同步和异步两种刷盘方式。同步刷盘会等待消息写入磁盘后才返回确认,而异步刷盘则可以提高性能。
-
消息确认:
- 消费者在成功处理消息后,需要发送一个确认回执给消息队列。如果消息队列没有收到确认,它会认为消息未被成功处理,并可能重新投递该消息。
RocketMQ没办法100%保证消息不丢失,只能最大限度保证消息称不丢失!
如何保证消息顺序消费
答:三次加锁!
- 使用同步发送消息的方式,把消息发送到同一队列中(发送时可以通过参数指定)。
- 需要同一个消费者消费(加锁实现-给消费者客户端申请分布式锁<首次加锁>)
- 因为消费者消费时是把消息丢到消费线程池多线程消费的,为了保证顺序消费,这里就要再次加锁,确保同事时间内一个队列里只能有一个线程处理该消息<二次加锁>。
- 对存储消息的processQueue加锁(解锁时再次加锁),确保重平衡过程中不会出现重复消费。
- 当集群新增了消费客户端进来的时候,某个队列可能原来输入A客户端,现在却要经过重平衡交由B客户端消费,这时候假如A客户端正在消费,此时消费结果还没提交,在交接的过程中就需要确保锁不被移除,否则可能其他客户端获取锁后会重复消费(所以就需要在A客户端解锁的时候再次加锁,如果加锁成功才允许解锁)。
顺序消费因为加了很多锁,会降低吞吐量,所以慎重选择。
如何实现延时消息
Rocket5.0版本之前,使用Timer定时器实现,先将消息存储在内存中,达到时间后再存储到磁盘中并且投递给消费者(Timer有缺陷,定时器中任务很多时会导致性能下降)。
Rocket5.0中增加了基于时间轮实现的定时消息,该算法很高效,可在O(1)时间内找到将执行的任务:
- 当Broker接收到定时消息时,根据消息的过期时间计算出需要投递的槽位,并将消息放置到对应的槽位中。
- 时间轮滴答到该槽位时便会触发事件,将消息投递给消费者