欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 12--RabbitMQ消息队列

12--RabbitMQ消息队列

2024/11/30 6:45:39 来源:https://blog.csdn.net/qq_43387908/article/details/140201172  浏览:    关键词:12--RabbitMQ消息队列

前言:前面一章内容太多,写了kafka,这里就写一下同类产品rabbitmq,rabbitmq内容较少,正好用来过度一下,概念还是会用一些例子来说明,实际部署的内容会放在概念之后。

1、基础概念

1.1、MQ消息队列简介

MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间,通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序,同时执行的要求。 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式,大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量

想象你在一家快餐店里点餐。通常情况下,你会排队等待点餐和取餐,这可能会花费很长时间,特别是在高峰时段。现在,快餐店引入了一个新的系统:你可以在点餐时拿到一个号码牌,然后继续坐下或者做其他事情。当你的餐点准备好时,店员会按照号码牌号叫你去取。这样一来,你不必在柜台前等待,可以自由地做其他事情,而店员也能够更高效地处理点餐和准备食物,避免了大家在柜台前的拥堵和等待。

在这个比喻中:

  • 号码牌就像消息队列中的消息,每个点餐的顾客(应用程序)都有一个唯一的号码(消息),用来标识自己的订单(任务)。
  • 店员就像消息队列的处理程序,负责处理顾客(应用程序)的订单(任务),并通知顾客(应用程序)何时可以取餐(任务完成)。
  • 等待时间减少,就像消息队列能够减少应用程序之间直接等待的时间,提高系统的效率和吞吐量

1.2、消息队列核心功能

解耦(将不同的系统分离开):避免操作等待时长不同的俩个系统互相浪费资源

冗余(存储):主要是用于存储请求

扩展性:扩充消息队列应对可能的数据流

削峰:核心功能,即时流量,闲时处理

可恢复性:数据丢失可以恢复

顺序保证:保证访问顺序

缓冲:作为大量数据(消息)的暂存点

异步通信:减少应用程序之间直接等待的时间

1.3、消息队列分类

1)P2P模式(安全)

Point-to-Point(P2P)点到点,P2P模式包含三个角色:

1.消息队列(Queue)

2.发送者(Sender)

3.接收者(Receiver)。

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。 P2P的特点如下:

• 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中

• 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列 • 接收者在成功接收消息之后,需向队列应答成功。

• 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式

2)Pub/Sub模式(并发)

Pub/Sub模式包含三个角色:

主题(Topic)

发布者(Publisher)

订阅者(Subscriber) 。

多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。Pub/Sub的特点如下:

• 每个消息可以有多个消费者 • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息

• 为了消费消息,订阅者必须保持运行的状态 • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

主要是应对超高并发,成本远高于第一种。

1.4、同类产品

Kafka:属于Apache,上一章内容

RabbitMQ:RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现,RabbitMQ比Kafka可靠。

RocketMQ:RocketMQ是阿里开源的消息中间件,它是纯Java开发,阿里用的多。

1.5、RabbitMQ简介

RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统,各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。

1.6、RabbitMQ架构图示

RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息

1.7、基础术语

这部分内容在工作中基本用不上,只是辅助理解一下

Message (消息):消息由消息头和消息体组成。消息体是不透明的,包含一些可选属性,如路由键 (routing-key)、优先级 (priority)、持久性存储 (delivery-mode) 等。

Publisher (消息生产者):发布者是向交换器发送消息的客户端程序,例如一个Java或PHP程序。

Exchange (交换器):交换器接收发布者发送的消息,并根据规则将消息路由到一个或多个队列中去。消息必须经过交换器才能进入队列。

Queue (消息队列):队列用于存储消息,直到消费者连接并将其取走。队列是消息的容器和终点,可以由一个或多个发布者投递消息。

Binding (绑定):绑定定义了交换器和队列之间的关联规则。基于路由键,绑定将消息从交换器路由到相应的队列中。

Virtual Host (虚拟主机):虚拟主机是独立的消息服务器区域,包含交换器、队列及相关对象。每个虚拟主机有自己的身份认证和加密环境,可以看作是RabbitMQ的一个小型实例。

Broker (代理):代理是RabbitMQ的服务器实体,类似于在Linux上创建的虚拟机。

Connection (连接):连接是RabbitMQ服务器和应用程序服务之间的TCP连接。

Channel (信道):信道是在TCP连接内创建的虚拟连接,用于在服务器和客户端之间传递消息。一条TCP连接可以包含多个信道,用于并行地发送、接收和订阅消息。

Consumer (消息消费者):消费者是从消息队列中获取消息的客户端应用程序,与生产者类似,是处理消息的程序。

消息是像是装在信封里的邮件,它由消息头和消息体组成。消息体是不透明的,包含了一些可选属性,如邮件的送达地址、优先级别、是否需要特殊保护等。发布者就像是寄信人,负责将信封(消息)投递到邮箱(交换器)里。你可以把发布者想象成是发送电子邮件的Java或PHP程序。交换器是一个智能的邮局,它接收发布者寄来的信件(消息),然后根据地址(路由键)将这些信件发送到正确的邮箱(队列)里。消息必须先经过交换器,才能最终进入队列中等待被取走。队列就像是一个装满信件的邮箱,它保存信件直到有消费者来取走。队列不仅是存储消息的地方,也是消息的目的地。一个消息可以被寄到一个或多个邮箱里,这取决于发布者的设定。消息会一直留在队列里,直到有消费者连接到队列并把它取走为止。绑定就像是一条明确的送达规则,连接了邮箱(队列)和寄信人(交换器)。通过绑定,可以根据地址(路由键)将交换器和队列联系起来,使交换器像是一个根据地址送信的邮递员。虚拟主机就像是一片独立的邮政区域,其中包含了若干个交换器、队列和相关的物件。每个虚拟主机都有自己独特的身份认证和安全加密环境。在RabbitMQ中,虚拟主机可以看作是一个完整的消息服务器,拥有自己的邮箱、邮递员和规则。代理可以理解为RabbitMQ的实际运行服务器,就像是在计算机系统中创建的一个虚拟邮局。连接是建立在RabbitMQ服务器和应用程序服务之间的通道,就像是打开了一条电线,让信件可以在两者之间传递。信道是连接中的虚拟通道,可以理解为一条独立的光纤。在一条连接上可以创建多个信道,每个信道独立处理消息的发送、接收和订阅。消费者就像是一个专门去邮箱里取信的人,它表示一个从队列中取走消息的客户端程序。消费者和发布者类似,只不过它们的工作是接收和处理消息。

1.8、RabbitMQ的通信过程

这是关于RabbitMQ通信过程的整理:

  1. 生产者 P1 发送消息: P1生产消息并将其发送到RabbitMQ服务器的 Exchange。

  2. Exchange 接收消息: Exchange接收到消息后,根据消息的 ROUTING KEY 将消息路由到匹配的 Queue1。

  3. Queue1 接收消息: Queue1收到消息并将其存储,准备发送给订阅者。

  4. 消费者 C1 订阅消息: 订阅者 C1 已经订阅了 Queue1 中的消息。

  5. C1 接收消息: Queue1将消息发送给订阅者 C1,C1接收到消息并进行处理。

  6. C1 发送 ACK 确认: C1处理完消息后,向 Queue1 发送 ACK 确认消息,表示已成功接收并处理了消息。

  7. Queue1 收到 ACK: Queue1收到 ACK 后,确认消息已成功传递给订阅者,并可以安全地删除队列中缓存的此条消息。

这个流程展示了RabbitMQ中消息从生产者到消费者的完整传递过程,包括消息的路由、存储和确认处理。

2、单机部署RabbitMQ

2.1、基础环境

环境已更换国内源与epel源,三台之间域名解析已配置,防火墙和selinux已关闭。

主机名需配置完毕重新加载,否则后续无法设置节点!!!

IP角色/主机名/域名
192.168.188.128rabbitmq1
192.168.188.129rabbitmq2
192.168.188.130rabbitmq3

2.2、安装Erlang

这里官网推荐的方式为配置erlang源,然后使用yum下载,这种方式适用于外网环境较稳定的操作环境

[root@rabbitmq1 ~]# wget https://objects.githubusercontent.com/github-production-release-asset-2e65be/47679505/a5e9b597-d9aa-44a2-960f-670142bae779?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20240705%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240705T132027Z&X-Amz-Expires=300&X-Amz-Signature=4b4e673b94952cf24ce0086e6a957ea2e584553fda51bacae46a8031b502b189&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=47679505&response-content-disposition=attachment%3B%20filename%3Derlang-23.3.4.18-1.el7.x86_64.rpm&response-content-type=application%2Foctet-stream
[root@rabbitmq1 ~]# yum install -y  erlang-23.3.4.18-1.el7.x86_64.rpm

2.3、安装RabbitMQ

这里有时候会断连,多试几次

[root@rabbitmq1 ~]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
[root@rabbitmq1 ~]# yum install -y rabbitmq-server-3.7.13-1.el7.noarch.rpm

2.4、修改配置文件

[root@rabbitmq1 ~]# cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
[root@rabbitmq1 ~]# vim /etc/rabbitmq/rabbitmq.config#    去掉注释%%和逗号,如下图所示,此处为开启默认账号

2.5、安装插件并启动服务

 安装启动rabbitmq管理插件,启动rabbitmq,查看节点状态

[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@rabbitmq1:
rabbitmq_management
The following plugins have been configured:rabbitmq_managementrabbitmq_management_agentrabbitmq_web_dispatch
Applying plugin configuration to rabbit@rabbitmq1...
The following plugins have been enabled:rabbitmq_managementrabbitmq_management_agentrabbitmq_web_dispatchset 3 plugins.
Offline change; changes will take effect at broker restart.
[root@rabbitmq1 ~]# systemctl restart rabbitmq-server
[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@rabbitmq1]},{cluster_name,<<"rabbit@rabbitmq1">>},{partitions,[]},{alarms,[{rabbit@rabbitmq1,[]}]}]

2.6、使用浏览器访问

访问rabbitmq服务器的15672端口,默认用户名/密码:guest/guest

2.7、web页面管理

这里为开发人员使用,运维尽量不要去这个页面,这里需要开发人员根据项目需要配置,运维只负责搭建环境!!!

添加用户

2.8、 使用命令行进行管理

执行以下操作

[root@rabbitmq1 ~]# rabbitmqctl list_queues#    查看所有的队列Timeout: 60.0 seconds ...
Listing queues for vhost / ...
[root@rabbitmq1 ~]# rabbitmqctl reset#    清除所有的队列Error: this command requires the 'rabbit' app to be stopped on the target node. Stop it with 'rabbitmqctl stop_app'.
Arguments given:resetUsage:
rabbitmqctl [-n <node>] [-l] [-q] reset
[root@rabbitmq1 ~]# rabbitmqctl add_user yonghuming zheshimima#    添加用户,语法看命令拼音部分Adding user "yonghuming" ...
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags yonghuming administrator#    分配角色为管理员Setting tags for user "yonghuming" to [administrator] ...

返回web页面查看新增账号

授权用户操作步骤如下

[root@rabbitmq1 ~]# rabbitmqctl add_vhost xunizhujimingzi#    新增虚拟主机Adding vhost "xunizhujimingzi" ...[root@rabbitmq1 ~]# rabbitmqctl set_permissions -p xunizhujimingzi yonghuming ".*" ".*" ".*"
Setting permissions for user "yonghuming" in vhost "xunizhujimingzi" ...#    将新虚拟主机授权给新用户,后面三个”*”代表用户拥有配置、写、读全部权限

返回web页面查看 

查看虚拟主机选项卡

3、RabbitMQ集群部署

3.1、简介

消息中间件RabbitMQ,一般以集群方式部署, 主要提供消息的接受和发送,实现各微服务之间的消息异步。可以与负载均衡结合形成RabbitMQ+HA架构

3.2、原理介绍

1)cookie

RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证来实现分布式,所以部署Rabbitmq分布式集群时要先安装Erlang,并把其中一个服务的cookie复制到另外的节点

2)内存节点和磁盘节点

RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,区别是消息存储的位置不同,一般都建立为磁盘节点,为了防止机器重启后的消息消失

3)普通模式和镜像模式

RabbitMQ的传统集群模式一般分为两种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制。

普通模式

普通模式下,以两个节点(rabbit01、rabbit02)为例来进行说明。对于消息队列来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈

镜像模式

镜像模式下,将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

3.3、环境要求

1、所有节点需要再同一个局域网内;

2、所有节点需要有相同的 erlang cookie,否则不能正常通信,为了实现cookie内容一致,采用scp的方式进行。

3、准备三台虚拟机,配置相同

4、集群中所有节点都需要hosts文件解析

使用上一个示例的环境继续操作

三台全部安装安装Erlang,安装rabbitmq,修改配置文件同上一示例

3.4、安装插件并启动服务

[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
[root@rabbitmq3 ~]# rabbitmq-plugins enable rabbitmq_management

 此处不要启动!!!一定要保证三台机器的cookie内容一致

找到erlang cookie文件的位置,源码包部署一般会存在.erlang.cookie文件;rpm包部署一般是在/var/lib/rabbitmq/.erlang.cookie。将 node1 的该文件使用rsync或者是scp复制到 node2、node3,文件权限需要是400。

[root@rabbitmq1 ~]# scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq1 ~]# scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq3:/var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq1 ~]# cat /var/lib/rabbitmq/.erlang.cookie#    此处必须检查是否完全一致[root@rabbitmq2 ~]# cat /var/lib/rabbitmq/.erlang.cookie[root@rabbitmq3 ~]# cat /var/lib/rabbitmq/.erlang.cookie

启动rabbitmq

[root@rabbitmq1 ~]# systemctl restart rabbitmq-server
[root@rabbitmq2 ~]# systemctl restart rabbitmq-server
[root@rabbitmq3 ~]# systemctl restart rabbitmq-server

仅关闭2,3节点的对外连接(集群功能),不关闭rabbitmq服务

[root@rabbitmq2 ~]# rabbitmqctl stop[root@rabbitmq3 ~]# rabbitmqctl stop

将2,3暂时设置为独立节点运行

[root@rabbitmq2 ~]# rabbitmq-server -detached[root@rabbitmq3 ~]# rabbitmq-server -detached#    忽略此处pid文件不可写的警告,这个警告不影响后续操作

查看各个节点状态

[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@rabbitmq1]},{cluster_name,<<"rabbit@rabbitmq1">>},{partitions,[]},{alarms,[{rabbit@rabbitmq1,[]}]}]#    2,3此处省略。每台主机看到的只有一个的server信息。目前尚未组合成集群

添加用户并为其授权,此时每个节点是单独的一台RabbitMQ,下面来将他们组成集群。

1
[root@rabbitmq1 ~]# rabbitmqctl add_user  admin admin
Adding user "admin" ...
User "admin" already exists
[root@rabbitmq1 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq1 ~]# 2
[root@rabbitmq2 ~]# rabbitmqctl add_user  admin admin
Adding user "admin" ...
[root@rabbitmq2 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq2 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq2 ~]# 3
[root@rabbitmq3 ~]# rabbitmqctl add_user  admin admin
Adding user "admin" ...
[root@rabbitmq3 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq3 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq3 ~]# 

最后一步组成集群

rabbitmq-server 启动时,会一起启动:节点和应用,它预先设置RabbitMQ应用为standalone(脱机)模式。要将一个节点加入到现有的集群中,你需要停止这个应用,并将节点设置为原始状态。如果使用rabbitmqctl stop,应用和节点都将被关闭。所以使用rabbitmqctl stop_app仅仅关闭应用。(停应用,不停止节点)

此处以磁盘节点示例

2
[root@rabbitmq2 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq2 ...
[root@rabbitmq2 ~]# rabbitmqctl join_cluster rabbit@rabbitmq1
Clustering node rabbit@rabbitmq2 with rabbit@rabbitmq1
[root@rabbitmq2 ~]# rabbitmqctl start_app
Starting node rabbit@rabbitmq2 ...completed with 3 plugins.3
[root@rabbitmq3 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq3 ...
[root@rabbitmq3 ~]# rabbitmqctl join_cluster rabbit@rabbitmq1
Clustering node rabbit@rabbitmq3 with rabbit@rabbitmq1
[root@rabbitmq3 ~]# rabbitmqctl start_app
Starting node rabbit@rabbitmq3 ...completed with 3 plugins.

此时集群状态

[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]},{running_nodes,[rabbit@rabbitmq3,rabbit@rabbitmq2,rabbit@rabbitmq1]},{cluster_name,<<"rabbit@rabbitmq1">>},{partitions,[]},{alarms,[{rabbit@rabbitmq3,[]},{rabbit@rabbitmq2,[]},{rabbit@rabbitmq1,[]}]}]

正常工作中此时移交admin/admin账号给开发组即可

4、Rabbitmq+HAproxy

此处保留haproxy配置文件模板供复制粘贴使用

global log         127.0.0.1 local2chroot      /var/lib/haproxypidfile     /var/run/haproxy.pidmaxconn     4000user        haproxygroup       haproxydaemonstats socket /var/lib/haproxy/statsdefaults log        global mode       tcp option     tcplog option     dontlognull retries    3 option redispatch maxconn 2000 contimeout      5s clitimeout      120s srvtimeout      120s listen rabbitmq_cluster 0.0.0.0:80#作为代理的服务器的IP和端口mode      tcp balance roundrobin server rabbit1  192.168.188.128:15672 check inter 5000 rise 2 fall 2 server rabbit2  192.168.188.129:15672 check inter 5000 rise 2 fall 2        server rabbit3  192.168.188.130:15672 check inter 2000 rise 2 fall 3listen monitorbind 0.0.0.0:8100#监控页面的访问端口mode httpoption httplogstats enablestats uri /rabbitmqstatsstats refresh 30sstats auth admin:admin       

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com