欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > RabbitMQ 消息队列

RabbitMQ 消息队列

2025/2/22 13:28:33 来源:https://blog.csdn.net/IT_iosers/article/details/145752313  浏览:    关键词:RabbitMQ 消息队列

1. 消息队列是什么?

当用户注册成功后,就发送邮件。当邮件发送成功了,接口才会提示注册成功信息。但由于发送邮件,依赖于其他厂商的服务,有可能他们的接口会非常耗时。那么用户就一直要等着邮件发送成功了,才提示注册成功。因为耗时很长,这样就会造成很糟糕的体验。

然后大家想一想,我们到底有没有必要等着邮件发送完成呢?就好比大家去邮局寄信,当你把信丢入邮筒里,你需要一直等着信送到别人手里,然后你再离开吗?

当然不需要,只要将信丢入邮筒,我们人就可以离开了。因为你知道,在将来的某个时候,你的信就会被寄到收件人手里。

消息队列,就好比是这里寄信的过程!消息队列由三部分构成:

在这里插入图片描述

  • 生产者(Producer):创建并发送消息,相当于去邮局寄信的人。
  • 队列(Queue):用来存放消息的,将消息从生产者传递到消费者,就相当于邮局了。邮局收到你的任务后,它去处理是需要排队的。按照先进先出的原则,需要等其他先来的任务处理完了,才会处理你的任务。
  • 消费者(Consumer):接收并处理消息的人。当邮局把信送到他手里了,收到消息了,最终处理任务的人。

还是以我们注册功能为例。当用户注册成功了,我们就通知RabbitMQ,你去发个邮件,然后就直接提示用户注册成功。我们完全没有必要等待邮件发送完成。

接着再写一个程序去接收消息,当它收到消息了,就去执行发邮件任务了。

所以,生产者、消费者,都是我们要写的程序。一个负责提交任务,另一个负责处理任务。互相之间通信,就靠中间的RabbitMQ。

2. 安装

2.1. 使用 Docker

当然,我最推荐的还是用Docker,因为太方便了,大家打开docker-compose.yml。增加RabbitMQ的配置,注意缩进非常重要,一定要和之前的MySQL和Redis的配置对齐

services:mysql:# ....redis:# ....rabbitmq:image: rabbitmq:4.0-managementports:- "5672:5672"- "15672:15672"volumes:- ./data/rabbitmq:/var/lib/rabbitmq # 持久化数据environment:RABBITMQ_DEFAULT_USER: adminRABBITMQ_DEFAULT_PASS: xw
  • 我们这里使用的RabbitMQ版本号是:4.0
  • 将RabbitMQ运行的默认端口567215672,映射到本机上。
  • 其中15672RabbitMQ自带的管理后台端口号
  • 接着将RabbitMQ的用来持久化数据的备份数据映射到项目根目录的data/rabbitmq目录中。
  • 最下面的是RabbitMQ用来连接的账号密码,大家可以根据需要自己设置。
    完成后,命令行,再次运行
docker-compose up -d

这样Docker,就会自动下载,并启动好RabbitMQ。
在这里插入图片描述

2.2. 直接安装

对于部分Windows上用不了Docker的同学,那就有点麻烦了。因为既要安装Erlang,又要安装RabbitMQ。这里附上下载地址:

  • Erlang下载地址:https://www.erlang.org/downloads
    -在这里插入图片描述

  • RabbitMQ下载地址:https://www.rabbitmq.com/docs/install-windows#downloads

  • 在这里插入图片描述
    对于这种安装方式,如果在安装过程出现了各种问题,请来讨论群中和我讨论。总而言之,我还是推荐大家尽可能,想办法将Docker跑通。

2.3.1 安装完ErlangRabbitMQ后,添加环境变量

打开环境变量设置窗口: 右键点击 “此电脑”,选择 “属性”。 在弹出的窗口中,点击 “高级系统设置”。 在 “系统属性” 窗口中,点击 “环境变量” 按钮。 添加 RabbitMQ 命令路径到系统的 Path 变量: 在 “系统变量” 列表中找到 “Path” 变量,选中它并点击 “编辑” 按钮。 在弹出的 “编辑环境变量” 窗口中,点击 “新建”,然后将 RabbitMQ 的 sbin目录路径添加进去,
例如 C:\Program Files\RabbitMQServer\rabbitmq_server-<version>\sbin。注意要将 替换为你实际安装的 RabbitMQ 版本号。 点击 “确定” 保存设置,依次关闭所有窗口 具体选择你自己安装的路径>。

2.3.2 通过命令行(管理员)安装rabbitmq_management 插件

RabbitMQ 的管理界面依赖于rabbitmq_management 插件

rabbitmq-plugins enable rabbitmq_management
2.3.3 通过命令行(管理员)运行
rabbitmq-service start
2.3. RabbitMQ 管理后台

启动好后,稍等个几秒钟,用浏览器访问:http://127.0.0.1:15672/,就能进入RabbitMQ的管理界面了

以游客模式登录 账号:guest 密码:guest

2.3.2.1 新增账号密码

在这里插入图片描述

在这里插入图片描述
输入刚才docker-compose.yml里的设置账号密码,就能进去了。里面的东西很多,先不用管它,一会儿再来看。

在这里插入图片描述

3. 官方案例解析

学习任何东西之前,最好的办法,无疑就是看官方文档了。
直接看JavaScript章节的Hello World部分。很明显,用之前先要装包

npm i amqplib

amqplib官方文档

3.1. 生产者:send.js

public目录里,新建一个send.js,代码大家从讲义文档直接复制过来。

amqplib.connect(‘amqp://admin:xw@localhost’) 参数详解

字段注解
amqp://表示使用 AMQP 协议进行连接
admin连接到 RabbitMQ 服务器时使用的用户名,用于身份验证
xw是与用户名对应的密码,用于验证用户身份。用户名和密码通过冒号 : 分隔
localhost指定了 RabbitMQ 服务器所在的主机地址,这里表示服务器运行在本地机器上。如果 RabbitMQ 服务器部署在其他机器上,需要将 localhost 替换为实际的 IP 地址或域名
const amqplib = require('amqplib');(async () => {try {const connection = await amqplib.connect('amqp://admin:xw@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 要发送的消息内容是:你好,xw!const msg = '你好,xw!';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 发送消息到队列// queue: 要发送消息的队列的名字// content: 要发送的消息内容// persistent: true,消息持久化,确保消息在 RabbitMQ 重启后仍然存在。channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });// 打印发送提示信息console.log('[x] 发送了:%s', msg);// 500ms 后关闭连接setTimeout(() => {connection.close();process.exit(0);}, 500);} catch (error) {console.log(error);}
})();

我们一点点分析下,它这里都写了些什么东西。
1、连接 const connection = await amqplib.connect(‘amqp://admin:clwy1234@localhost’)
2、创建一个通道。通道是进行通信的基本单位,只有通过通道,才可以发送和接收消息
3、我们定义了队列的名字是:hello。并定义了要发送的消息内容是:你好,xw!
4、创建了一个队列。但如果队列已经存在,那就直接使用,不会创建新的了。这里要把队列的名字放进去,也就是hello
这里还有个参数:durable: false,它表示队列是否持久化 false:不持久化,true:队列的信息会被写入磁盘,也就是docker-compose里设置的./data/rabbitmq,当RabbitMQ服务重启后,队列会被自动恢复
5、channel.sendToQueue,很明显,这就是将消息发送到队列了。
接着做了个console.log提示,可以显示在命令行里,便于我们观察。
最后面,等500ms后,将连接断开

看完后,我们就先将它跑起来,看看到底有个什么效果。用cd命令,进入public目录中。然后直接用node运行这个文件

cd public
node send.js

打开RabbitMQ的管理后台:http://127.0.0.1:15672/

里面的菜单,依次是:

  • 概览:Overview
  • 连接:Connections
  • 通道:Channels
  • 交换机:Exchanges
  • 队列和流:Queues and Streams
  • 管理用户:Admin

由于代码里,在500ms之后,就将连接关闭了。所以连接和通道里,都看不到东西。大家点击队列这里,里面可以看到hello队列了。这正是我们刚才创建的队列名字。
在这里插入图片描述
点进去看,里面可以看到队列的一些详细内容。

  • 上面有一些统计信息。一共收到了一条信息,准备好的有几条,在内存中的又几条,占有了多少空间、多少内存什么的。
    在这里插入图片描述

  • 再往下看,注意这里有个发送消息(Publish message)获取消息(Get message)

  • 我们先点获取消息,这里就能看到刚才发送的:你好,xw

在这里插入图片描述

4.2. receive.js

刚才是生产者的代码,我们接着继续看消费者的代码。在public目录中新建receive.js

const amqplib = require('amqplib');(async () => {try {// 连接到 RabbitMQconst connection = await amqplib.connect('amqp://admin:clwy1234@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 打印等待接收消息的提示信息console.log(' [*] 等待接收消息在 %s 队列中. 按 CTRL+C 退出', queue);// 当接收到消息channel.consume(queue, (msg) => {// 打印接收到的消息内容console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息// channel.ack(msg);}, {// noAck: 表示是否自动确认消息,设置为true表示自动确认,设置为false表示手动确认// 如果设置为false,需要手动确认消息,否则消息会被重复消费。例如:channel.ack(msg)noAck: true});} catch (error) {console.log(error);}
})();

我们来运行一下,

node receive.js

在这里插入图片描述
再回去刷新页面,也多刷新个几次。因为刚才的两条消息都已经处理完成了,所以现在的准备好的(Ready)和在内存中的(In memory)都是0了。
在这里插入图片描述
点击连接(Connections),因为我们这次的消费者代码,并没有关闭连接,所以一直连在RabbitMQ上
在这里插入图片描述
点击通道(Channels),里面也能看到创建的通道。
在这里插入图片描述

4. noAck 自动、手动确认消息

关于自动确认消息,大家可以改为

noAck: false
ack(确认)当消费者成功处理完一条消息后,会向 RabbitMQ 服务器发送一个确认信号(ack),告知服务器该消息已经被成功消费,服务器可以将该消息从队列中移除
nack(否定确认)当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。

channel.nack(message, multiple, requeue); 参数

参数注释
message该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息
multiple这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。
requeue同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理

这样就需要手动确认消息了,我们先不确认。将接收消息的终端重启一下,重新发送一次,可以看到收到信息了。
CTRL+C停止后,再次接收消息,发现还能接收到这个消息。这就错了,按道理消息被处理完成后,就不应该重复收到了。

这就是因为,由于我们没有手动确认,所以RabbitMQ,认为消息一直没有被处理成功,就会不断的发过来,让你处理。
在这里插入图片描述
加上

// 当接收到消息
channel.consume(queue, function (msg) {console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息channel.ack(msg);}, {noAck: false
});

CTRL+C停止后,重新连接,再接收一次。现在的代码里,已经将消息确认了。再次按CTRL+C停止,再连接上去,就不会收到重复的消息了。
在这里插入图片描述
那么ack设计的目的,是让你有更高的灵活度。因为有些任务的执行,有可能失败,需要重试。我这里有个代码示例,大家可以参考一下。

// 模拟处理逻辑
channel.consume(queue, function (msg) {try {// 假设处理成功,手动确认消息channel.ack(msg);} catch (error) {// 处理失败,可以选择拒绝消息,将消息从队列中删除// nack:否定确认):当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。channel.nack(msg, false, false);console.error(error);}
}, {noAck: false
});

总结

1、消息队列的组成

基础的消息队列,由三部分组成:生产者、队列和消费者。

名称说明
生产者将消息发送到队列
队列临时存储消息,按照先进先出的原则,在生产者和消费者之间传递消息
消费者监听队列,收到消息后进行处理。
2、RabbitMQ 的方法
方法说明
const connection = amqplib.connect连接到 RabbitMQ
connection.close关闭连接
const channel = connection.createChannel创建通道
channel.assertQueue创建队列
channel.sendToQueue发送消息
channel.consume接收消息
channel.ack确认消息,消息被成功处理。
channel.nack拒绝消息,消息处理失败。
3、nack的参数
参数注释
message该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息
multiple这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。
requeue同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理
4、 RabbitMQ 的参数
字段说明
durable: true队列持久化,队列信息将被写入磁盘,在RabbitMQ服务重启后仍然存在。但是队列已经创建过后,这个参数就没法改了,可以删掉队列,重新建一个新的。
persistent: true消息持久化,消息将会被写入磁盘,在RabbitMQ服务重启后仍然存在。
noAck: true自动确认消息,只要收到了就会自动确认。如果设置成false,则需要手动确认。

使用RabbitMQ发送邮件

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词