欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > RabbitMQ

RabbitMQ

2025/2/24 23:30:12 来源:https://blog.csdn.net/tulingtuling/article/details/145609091  浏览:    关键词:RabbitMQ

在消息队列(MQ)中,确保消息成功传递是一个关键问题。消息传递的过程通常包括以下几个阶段:publisher(生产者) -> exchange(交换机) -> queue(队列) -> consumer(消费者)。为了确保消息在每个阶段都能成功传递,我们需要采取一系列措施来保证消息的可靠性。

生产者的可靠性

重试机制

当生产者与交换机(或队列,如果没有交换机)之间的连接不稳定时,生产者发送的消息可能会在传输过程中丢失。在这种情况下,生产者需要等待一段时间以获取响应。如果未收到响应,生产者应尝试重新发送消息。重试次数应有限制,以防止因持续重试而占用过多资源。此外,重试之间应有一定的间隔时间,以避免频繁重试导致资源浪费。

由于发送消息时会占用通道,其他业务操作可能会被阻塞,直到消息发送完成(无论成功或失败)。因此,对发送消息的重试机制进行限制是必要的,以防止因连接问题导致资源被长时间占用。

以下是一个在Spring Boot中配置生产者重试机制的示例:

spring:rabbitmq:connection-timeout: 1s  # 连接超时时间template:retry:enabled: true  # 开启重试机制initial-interval: 1000ms  # 初始重试间隔时间multiplier: 1  # 重试间隔时间的倍数max-attempts: 5  # 最大重试次数

在这个配置中:

  • connection-timeout 设置了连接超时的时间。

  • enabled: true 开启了重试机制。

  • initial-interval: 1000ms 设置了在连接失败后,首次重试前的等待时间。

  • multiplier: 1 设置了每次重试后等待时间的倍数(在此例中,等待时间保持不变)。

  • max-attempts: 5 设置了最大重试次数,超过该次数后将不再重试。

通过这种配置,生产者在发送消息失败后会自动进行重试,直到达到最大重试次数或消息成功发送。这种机制有效地提高了消息传递的可靠性,同时避免了因持续重试而导致的资源浪费

根据您提供的信息,我们可以分析MQ连接失败时的重试行为。以下是详细的分析:

  1. 连接超时时间:设置为1秒。这意味着如果MQ在1秒内未能成功连接,连接尝试将被视为失败。

  2. 初始重试间隔:设置为1秒。在第一次连接失败后,系统会等待1秒再进行下一次连接尝试。

  3. 等待时间倍数:设置为1。这意味着每次重试的等待时间保持不变。因此,每次重试的间隔时间为1秒(等待时间) + 1秒(连接超时时间) = 2秒。

  4. 最大重试次数:设置为5次。系统会在达到最大重试次数后停止尝试连接。

根据这些设置,系统在连接失败后的行为如下:

  • 第一次连接失败后,等待1秒,然后进行第二次连接尝试。

  • 第二次连接失败后,再次等待1秒,然后进行第三次连接尝试。

  • 第三次连接失败后,系统将停止尝试连接。

如果multiplier设置为2,重试行为将有所不同:

  • 第一次连接失败后,等待1秒,然后进行第二次连接尝试。

  • 第二次连接失败后,等待时间翻倍为2秒,然后进行第三次连接尝试。

  • 第三次连接失败后,等待时间再次翻倍为4秒,然后系统将停止尝试连接。

这种配置确保了系统在连接失败时能够进行有限次数的重试,同时通过调整等待时间倍数来控制重试的频率,以避免过度占用资源。

确认机制

在正常情况下,消息传递到MQ后不会发生丢失,但我们仍需对消息丢失有所防备。为了及时发现消息丢失,MQ通常使用Publisher ConfirmPublisher Return两种机制来进行预警。


消息丢失的可能情况
  1. MQ内部故障:MQ服务器内部出现问题,导致消息丢失。

  2. 交换机或队列不存在:消息无法找到目标交换机或队列。

  3. RoutingKey不匹配:消息的路由键(RoutingKey)没有匹配的队列。

  4. 其他异常情况:例如网络故障、消息过期(TTL)等。


消息传递的几种状态
  1. 路由失败,但投递成功

    • 消息传递到MQ后,MQ服务器成功接收并存储了消息(投递成功)。

    • 但由于路由失败(例如RoutingKey不匹配或目标队列不存在),消息无法被正确路由到下一个节点(例如队列)。

    • MQ服务器会返回ACK确认投递成功,同时通过Publisher Return机制返回路由失败的异常信息。

  2. 临时消息投递成功

    • 临时消息传递到MQ后,被存储在内存中。

    • MQ服务器返回ACK,表示消息已成功投递。

  3. 持久化消息投递成功

    • 持久化消息传递到MQ后,被持久化存储到磁盘。

    • MQ服务器返回ACK,表示消息已成功投递。

  4. 投递失败

    • 如果消息未能成功传递到MQ(例如网络故障或MQ服务器不可用),MQ服务器会返回NACK,表示投递失败。


Publisher Confirm 和 Publisher Return 的作用
  1. Publisher Confirm

    • 通过返回ACKNACK,告知生产者消息是否成功投递到MQ服务器。

    • ACK表示消息已成功投递(无论是临时消息还是持久化消息)。

    • NACK表示消息投递失败。

  2. Publisher Return

    • 当消息成功投递到MQ服务器但路由失败时,通过Publisher Return返回异常信息。

    • 帮助生产者及时发现消息无法被正确路由的问题。


总结
  • 投递成功:消息成功到达MQ服务器并被存储(临时消息存储在内存中,持久化消息存储在磁盘中),MQ返回ACK

  • 路由失败:消息成功投递到MQ服务器,但无法被正确路由到目标队列,MQ通过Publisher Return返回异常信息。

  • 投递失败:消息未能成功传递到MQ服务器,MQ返回NACK

通过Publisher ConfirmPublisher Return机制,生产者可以及时了解消息的投递状态,从而有效预防和发现消息丢失问题。

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 

默认情况下, Publisher Confirm和Publisher Return两个机制是关闭的. publisher-confirm-type有三钟状态

none: 默认关闭, 就是这个状态

simple: 同步进行, 需要等到回复状态之后才会继续业务

correlated: 异步进行, 在等待回复状态的同时, 业务可以继续进行处理

MQ的可靠性

数据持久化

确保消息队列(MQ)可靠性的关键措施

在分布式系统中,消息队列(MQ)是异步通信的核心组件。然而,即使在正常情况下,消息在到达MQ后仍有可能丢失。因此,确保MQ的可靠性至关重要。本文将介绍几种常见的MQ可靠性方案。

1. 消息丢失的原因

MQ通常将消息存储在内存中进行处理和传递,这种方式虽然高效,但在MQ服务重启或崩溃时,内存中的消息会丢失。为了解决这一问题,我们需要采取以下措施来增强MQ的可靠性。

2. MQ可靠性方案

2.1 交换机的持久化

交换机(Exchange)是MQ中路由消息的关键组件。通过将交换机设置为持久化,可以确保在MQ重启后,交换机的配置和元数据不会丢失,从而保证消息能够继续被正确路由。

2.2 队列的持久化

队列(Queue)是消息的存储载体。将队列设置为持久化后,即使MQ服务重启,队列中的消息也不会丢失。持久化队列会将消息存储到磁盘中,而不是仅仅依赖内存。

2.3 消息的持久化

除了交换机和队列的持久化,消息本身也需要进行持久化处理。持久化的消息在未被消费前会一直存储在磁盘中,只有在被成功消费后才会被删除。这种方式可以有效避免因MQ重启或崩溃导致的消息丢失。

3. 生产者确认机制

为了进一步增强MQ的可靠性,可以启用生产者确认机制(Publisher Confirms)。当消息被持久化存储到磁盘后,MQ会向生产者发送一个确认(ACK),告知消息已安全存储。这种机制可以确保消息不会在传输过程中丢失。

4. 批量持久化与异步处理

为了提高性能,MQ通常不会逐条持久化消息,而是采用批量持久化的方式。这种方式可以显著减少磁盘I/O操作,提升系统的整体效率。同时,推荐使用异步方式进行持久化,以避免阻塞消息的处理流程。

5. 总结

通过交换机的持久化、队列的持久化、消息的持久化以及生产者确认机制,可以显著提升MQ的可靠性。此外,批量持久化和异步处理能够在不牺牲可靠性的前提下,进一步提高系统的性能。在实际应用中,建议根据业务需求合理配置这些机制,以确保消息的可靠传递。

LazyQueue

消息存储在内存中的优势与挑战

RabbitMQ 默认将消息存储在内存中,因为内存的读写速度远高于硬盘,这可以显著提高消息处理的效率。然而,这种设计也带来了一些潜在的问题,尤其是在消息量激增的情况下。

1. 内存存储的优势

  • 高效读写: 内存的访问速度比硬盘快得多,因此将消息存储在内存中可以大幅降低消息收发的延迟,提升系统性能。

  • 低延迟: 对于实时性要求较高的场景,内存存储能够确保消息快速传递。

2. 内存存储的挑战

尽管内存存储有诸多优势,但在某些情况下,可能会面临以下问题:

2.1 生产者消息激增

当生产者的消息发送速率突然增加时,可能会导致消息在内存中大量堆积。

2.2 消费者处理能力不足

如果消费者的处理速度跟不上生产者的发送速度,消息会在内存中积压,占用大量内存资源。

2.3 内存限制与 PageOut

内存的容量是有限的,当消息积压超过内存的极限时,RabbitMQ 会将部分消息从内存转移到硬盘中,这个过程称为 PageOut。在 PageOut 过程中:

  • 生产者可能会被拒绝发送消息(流控机制生效)。

  • 消费者也无法消费消息,因为部分消息正在从内存转移到硬盘。

  • 内存资源会被占用,影响系统的整体性能。

3. Lazy Queue 的引入

为了解决上述问题,RabbitMQ 在 3.12 版本 之后引入了 Lazy Queue 机制。Lazy Queue 的核心设计思想是:

  • 消息优先存储到硬盘: 消息不会直接存储在内存中,而是批量写入硬盘。

  • 懒加载机制: 只有当消费者需要消费消息时,才会将消息从硬盘加载到内存中。

  • 支持海量消息存储: Lazy Queue 可以轻松处理百万级甚至更多的消息量,而不会对内存造成过大压力。

4. Lazy Queue 的优势

  • 降低内存压力: 消息主要存储在硬盘中,内存占用大幅减少。

  • 提高系统稳定性: 即使消息量激增,也不会因为内存不足而导致消息丢失或系统崩溃。

  • 适合高吞吐量场景: 对于消息量大但实时性要求不高的场景,Lazy Queue 是一个理想的选择。

消费者的可靠性

当RabbitMq将消息传递给消费者, 依旧会存在之前的消息丢失, 比如消费者处理异常, 发送失败, 消费者宕机等情况

消费者确认机制

当RabbitMQ将消息传递给消费者时,仍然可能存在消息丢失的情况,例如消费者处理异常、发送失败、消费者宕机等。为了确保消息的可靠传递,RabbitMQ提供了消费者确认机制,允许消费者在处理消息后向RabbitMQ反馈消息的处理状态。

消费者确认机制

RabbitMQ的消费者确认机制主要分为以下三种状态:

  • ack(确认):消息处理成功,RabbitMQ会将该消息从队列中删除。

  • nack(否定确认):消息处理异常,RabbitMQ会将消息重新加载回队列,进行重试。

  • reject(拒绝):消息处理异常,且消息无法被处理,RabbitMQ会直接删除该消息。reject通常用于消息类型不匹配或无法处理的场景。

Spring AMQP中的确认模式

Spring AMQP为开发者提供了三种确认模式,以简化消息确认的处理:

  1. none模式

    • 当消息发送到消费者后,RabbitMQ会立即确认(ack)并删除消息,无论消费者是否成功处理。

    • 适用于对消息丢失不敏感的场景。

  2. manual模式

    • 开发者需要手动调用acknackreject来确认消息的处理状态。

    • 适用于需要精细控制消息确认的场景,但可能会造成业务代码的污染。

  3. auto模式

    • Spring AMQP会自动根据消息处理的结果发送确认。

      • 如果业务正常处理,返回ack

      • 如果发生业务异常,返回nack,消息会重新入队进行重试。

      • 如果发生消息校验或处理异常,返回reject,消息会被直接删除。

    • 适用于大多数常见的业务场景。

配置示例

在Spring Boot中,可以通过以下配置来设置确认模式:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 可选值为 none, manual, auto

异常处理的最佳实践

auto模式下,建议根据异常类型决定是返回nack还是reject。例如,对于可重试的业务异常(如网络抖动),可以返回nack;对于不可恢复的异常(如消息格式错误),可以返回reject

失败重试机制

业务异常处理与重试机制

在消息队列中,如果不对业务异常处理进行合理的限制,每次业务发生异常时,消息会重新入队并进行重试。如果没有适当的控制机制,消息可能会不断重试,导致系统资源被空耗,甚至引发消息堆积、系统崩溃等问题。

在Spring AMQP中,可以通过配置RabbitMQ的监听器重试机制来避免这种情况。以下是一个典型的配置示例:

spring:rabbitmq:listener:simple:retry:enabled: true          # 开启重试机制max-attempts: 5         # 最大重试次数initial-interval: 1000  # 初始重试间隔时间(毫秒)multiplier: 1           # 重试间隔时间的倍数stateless: true         # 是否启用无状态重试
配置项说明:
  1. enabled: 是否启用重试机制。

  2. max-attempts: 最大重试次数。

  3. initial-interval: 初始重试间隔时间。

  4. multiplier: 重试间隔时间的倍数。

  5. stateless: 是否启用无状态重试。设置为true时,表示每次重试都是独立的,不会保留前一次重试的状态。这对于一些涉及上下文代码或变量传递的业务场景非常重要。如果业务逻辑依赖于某些变量的初始状态,启用无状态重试可以确保每次重试时变量都恢复到初始值,避免前一次重试的结果影响后续业务。

无状态重试的重要性

在某些业务场景中,业务逻辑可能会依赖于某些变量的初始状态。例如:

int x = 5;  // 初始值为5
// 业务逻辑处理
x = 8;      // 业务处理后,x的值变为8

如果在下一次重试时,变量x仍然保留上一次处理后的值(即8),而不是恢复到初始值(5),这可能会导致业务逻辑出现错误。通过设置stateless: true,可以确保每次重试时,业务逻辑中的变量都会恢复到初始状态,从而避免这种问题。

失败处理策略

当消息在本地多次重试失败后, 超过重试次数的限制, 会被队列删除, 但是这对于消息可靠性要求高的业务并不友好, 所以提供了一个MessageRecovery接口, 这个接口较好的实现是RepublishMessageRecovery类, 它是把异常信息放到一个单独的队列中, 后续人工介入处理

业务幂等性

在执行某个业务操作时,无论该操作被执行一次还是多次,最终的业务结果都是一致的,这就是幂等性。幂等性在分布式系统和网络通信中尤为重要,因为它可以有效避免重复操作带来的问题。

举个例子:当你在购物时进行扣款操作,如果由于网络延迟或其他原因,扣款请求被重复发送了两次,而你被扣了两次钱,这显然是不可接受的。为了避免这种情况,我们需要确保扣款操作的幂等性。

唯一消息ID

在消息队列系统中,为了确保消息的幂等性(即多次处理同一消息不会产生重复的效果),通常会在发送消息时附带一个唯一ID。这个唯一ID可以是全局唯一标识符(UUID)或根据业务规则生成的唯一值。消费者在接收到消息后,首先会检查该ID是否已经存在于数据库中。如果该ID已经存在,说明该消息已经被处理过,消费者会直接跳过该消息,避免重复处理;如果该ID不存在,消费者则会处理该消息,并将该ID存储到数据库中,以确保后续重复消息不会被重复处理。

业务判断

除了使用唯一消息ID来确保消息的幂等性外,业务判断也是处理重复消息或请求的重要手段。业务判断是通过业务本身的逻辑来进行判断,确保即使消息ID不同,但业务内容相同的请求也不会被重复处理。例如,在订单系统中,可以通过订单号、用户ID等业务字段来判断是否已经处理过相同的请求。

通过结合唯一消息ID和业务判断,可以有效地确保消息队列系统中的消息处理是幂等的,从而避免重复处理带来的业务问题。

兜底方案

尽管MQ消息系统已经尽可能减少了消息丢失的可能性,但在实际应用中,消息丢失的情况仍然可能发生。因此,我们需要主动采取措施来确保业务结果的准确性。具体来说,可以通过定时任务来实现这一目标。定时任务会每隔一段时间对业务结果进行比对和检查,确保数据的完整性和一致性。

这种兜底方案的核心在于通过定期检查来弥补消息丢失可能带来的影响,从而保证业务的最终一致性。

消息幂等与性能优化

在高数据量的场景下,消息幂等性不仅要保证消息处理的正确性,还需要提升消息的存取速度和系统效率。以下是几种常见的优化方案:

  1. 集群部署:通过集群化部署,可以提高系统的稳定性和吞吐量,分散单节点的压力,从而提升整体性能。

  2. 分库分表:对于数据量巨大的场景,可以采用分库分表的方式,将数据分散到多个数据库或表中,减少单表的数据量,提升查询和写入效率。

  3. 数据生命周期管理:通过定时任务对数据进行归档、移动或删除,确保数据的时效性,避免无效数据占用存储资源,从而提升系统性能。

这些方法结合使用,可以在保证消息幂等性的同时,有效提升系统的处理能力和效率

延时消息

在网上售卖货物时,由于商品数量有限,当用户下单后,数据库会自动扣减商品库存。然而,如果用户未在规定时间内完成付款,这些商品就会被该用户占用,导致其他购物者无法购买。为了解决这一问题,我们需要对未付款的用户设置时间限制,要求他们在规定时间内完成付款,否则库存将被释放。

这种需要在一定时间后再执行的任务被称为延时任务。消息队列(MQ)提供了两种处理延时任务的解决方案:死信交换机延时消息插件

死信交换机

死信是指以下几种情况下的消息:

  1. 超时未被处理的消息:消息在队列中等待时间过长,未被消费者处理。

  2. 队列满员:当队列达到最大容量时,新消息无法进入,成为死信。

  3. 消费失败的消息:消费者在处理消息时返回nackreject,并且将requeue设置为false,表示消息消费失败且不再重新入队。

当一个队列中存在死信消息时,可以通过配置dead-letter-exchange参数,将这些死信消息转发到一个特定的交换机,这个交换机被称为死信交换机。死信交换机会与一个或多个队列绑定,用于接收和处理这些死信消息。

延时消息插件

延时消息插件是一种简化消息延时处理的工具,相较于使用死信交换机的方案,它减少了实现延时消息所需的步骤。延时消息插件在基本的消息流程(生产者 -> 交换机 -> 队列 -> 消费者)中,通过将交换机设计为具备延时和暂时存储消息的能力,从而直接实现消息的延时投递。这种方式避免了传统死信交换机方案中需要额外设置死信队列、绑定死信交换机等复杂操作,简化了系统的设计和维护。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词