前言
RabbitMQ介绍
⽤⼾注册完成之后,系统会给⽤⼾发送⼀封邮件通知
但是邮件发送成功失败并不影响我们⽤⼾注册成功.我们可以采⽤RabbitMQ来进⾏服务的解耦
RabbitMQ是⼀个开源的消息代理和队列服务器,⼴泛⽤于实现消息队列和事件通知.它⽀持多种消息
协议,并且具有⾼可⽤性、灵活的路由、可靠的消息传递和易于使⽤的管理界⾯
MQ( Message queue ),从字⾯意思上看,本质是个队列,FIFO先⼊先出,只不过队列中存放的内容
是消息(message)⽽已.消息可以⾮常简单,⽐如只包含⽂本字符串,JSON等,也可以很复杂,⽐如内嵌对
象.
MQ多⽤于分布式系统之间进⾏通信.
系统之间的调⽤通常有两种⽅式
- 同步通信
直接调⽤对⽅的服务,数据从⼀端发出后⽴即就可以达到另⼀ - 异步通信
数据从⼀端发出后,先进⼊⼀个容器进⾏临时存储,当达到某种条件后,再由这个容器发送给另⼀端.容器的⼀个具体实现就是MQ( message queue )
RabbitMQ 就是MQ的⼀种实现
MQ作用
- 异步解耦:在业务流程中,⼀些操作可能⾮常耗时,但并不需要即时返回结果.可以借助MQ把这些操
作异步化,⽐如⽤⼾注册后发送注册短信或邮件通知,可以作为异步任务处理,⽽不必等待这些操作
完成后才告知⽤⼾注册成功. - 流量削峰:在访问量剧增的情况下,应⽤仍然需要继续发挥作⽤,但是是这样的突发流量并不常⻅.如
果以能处理这类峰值为标准⽽投⼊资源,⽆疑是巨⼤的浪费.使⽤MQ能够使关键组件⽀撑突发访问压
⼒,不会因为突发流量⽽崩溃.⽐如秒杀或者促销活动,可以使⽤MQ来控制流量,将请求排队,然后系
统根据⾃⼰的处理能⼒逐步处理这些请求. - 异步通信:在很多时候应⽤不需要⽴即处理消息,MQ提供了异步处理机制,允许应⽤把⼀些消息放⼊MQ中,但并不⽴即处理它,在需要的时候再慢慢处理.
- 消息分发:当多个系统需要对同⼀数据做出响应时,可以使⽤MQ进⾏消息分发.⽐如⽀付成功后,⽀
付系统可以向MQ发送消息,其他系统订阅该消息,⽽⽆需轮询数据库. - 延迟通知:在需要在特定时间后发送通知的场景中,可以使⽤MQ的延迟消息功能,⽐如在电⼦商务平台中,如果⽤⼾下单后⼀定时间内未⽀付,可以使⽤延迟队列在超时后⾃动取消订单
- …
为什么选择RabbitMQ
Kafka
Kafka⼀开始的⽬的就是⽤于⽇志收集和传输,追求⾼吞吐量,性能卓越,单机吞吐达到⼗万级,在⽇
志领域⽐较成熟,功能较为简单,主要⽀持简单的MQ功能,如果有⽇志采集需求,肯定是⾸选kafka
了。
RocketMQ
RocketMQ采⽤Java语⾔开发,由阿⾥巴巴开源,后捐赠给了Apache.
它在设计时借鉴了Kafka,并做出了⼀些⾃⼰的改进,⻘出于蓝⽽胜于蓝,经过多年双⼗⼀的洗礼,在
可⽤性、可靠性以及稳定性等⽅⾯都有出⾊的表现.适合对于可靠性⽐较⾼,且并发⽐较⼤的场景,⽐
如互联⽹⾦融.但⽀持的客⼾端语⾔不多,且社区活跃度⼀般
RabbitMQ
采⽤Erlang语⾔开发,MQ功能⽐较完备,且⼏乎⽀持所有主流语⾔,开源提供的界⾯也⾮常友好,性
能较好,吞吐量能达到万级,社区活跃度也⽐较⾼,⽐较适合中⼩型公司,数据量没那么⼤,且并发没
那么⾼的场景
RabbitMQ安装
ubuntu
安装Erlang
RabbitMQ安装可以用ubuntu自带的安装,但是这个版本老,我们也可以在官网安装
官网安装
我们现在用Ubuntu仓库中的版本来安装
先安装Erlang
#更新软件包
sudo apt-get update
#安装erlang
sudo apt-get install erlang
erl
输入erl就可以检查是否安装好了
输入halt().就退出了
安装RabbitMQ
#更新软件包
sudo apt-get update
#安装rabbitmq
sudo apt-get install rabbitmq-server
#确认安装结果
systemctl status rabbitmq-server
这样就成功了
安装RabittMQ管理界面
rabbitmq-plugins enable rabbitmq_management
启动服务
sudo service rabbitmq-server start
默认是启动的
访问
通过 IP:port 访问界⾯
15672为默认端口号
界面端口号是15672
服务端通信的端口号是5672
所以这两个都要开放
默认的账号名,密码
是guest,guest
这个的意思是,这个用户名只能在本地登录,我们这个不是本地登录
所以要创建新的用户
添加用户
rabbitmqctl add_user ${账号} ${密码}
rabbitmqctl add_user admin admin
它告诉我们不要忘记设置权限
添加权限
#rabbitmqctl set_user_tags ${账号} ${⻆⾊名称}
rabbitmqctl set_user_tags admin administrator
这个的意思就是给admin设置为管理员用户
以下⻆⾊可选
RabbitMQ⽤⼾⻆⾊分为Administrator、Monitoring、Policymaker、Management、
Impersonator、None共六种⻆⾊
- Administrator 超级管理员,可登陆管理控制台(启⽤management plugin的情况下),可查看所
有的信息,并且可以对⽤⼾,策略(policy)进⾏操作 - Monitoring 监控者,可登陆管理控制台(启⽤management plugin的情况下),同时可以查看
rabbitmq节点的相关信息(进程数,内存使⽤情况,磁盘使⽤情况等)。 - Policymaker 策略制定者,可登陆管理控制台(启⽤management plugin的情况下),同时可以对
policy进⾏管理。但⽆法查看节点的相关信息. - Management 普通管理者,仅可登陆管理控制台(启⽤management plugin的情况下),⽆法看到
节点信息,也⽆法对策略进⾏管理. - Impersonator 模拟者,⽆法登录管理控制台。
- None 其他⽤⼾,⽆法登陆管理控制台,通常就是普通的⽣产者和消费者
这样就可以登录进来了
docker安装
#查询镜像
docker search rabbitmq:management
#获取镜像
docker pull rabbitmq:management
#运⾏镜像
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
#查看正在运⾏的容器
docker ps
#进⼊容器内部
docker exec -it 容器ID /bin/bash
、#查看正在运⾏的容器
docker ps
#进⼊容器内部
docker exec -it 容器ID /bin/bash
#添加⽤⼾admin
rabbitmqctl add_user admin admin
#给⽤⼾授权
rabbitmqctl set_user_tags admin administrator
其他操作
1. 查找rabbitmq位置
[lucf@VM-8-12-centos ~]$ whereis rabbitmq
rabbitmq: /usr/lib/rabbitmq /etc/rabbitmq2. 新增配置⽂件rabbitmq.conf
(在/etc/rabbitmq路径下创建rabbitmq.conf⽂件, 并添加以下内容)
#修改client端⼝为8942(默认为5672)
listeners.tcp.default=8942
#修改管理界⾯端⼝为8943(默认为15672)
management.tcp.port=89433.修改rabbitmq-defaults⽂件,添加配置⽂件路径
⽂件路径: /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.30/sbin/rabbitmq-defaults
编辑rabbitmq-defaults, 在⽂本最后添加如下代码
#添加配置路径到⽂件中,保存退出
CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf4.重启RabbitMQ
sudo systemctl restart rabbitmq-server#启动服务
sudo systemctl start rabbitmq-server
#停⽌服务
sudo systemctl stop rabbitmq-server
#重启服务
sudo systemctl restart rabbitmq-server
#添加开机启动服务
sudo systemctl enable rabbitmq-server
#检查服务状态
sudo systemctl status rabbitmq-server三. 卸载RabbitMQ
1. Ubuntu
1.1 停⽌RabbitMQ服务
1 sudo systemctl stop rabbitmq-server
1.2 查找RabbitMQ安装情况
1 dpkg -l | grep rabbitmq
1.3 卸载rabbitmq已安装的相关内容
1 sudo apt-get purge --auto-remove rabbitmq-server
1.4 卸载Erlang
1. 查看erlang安装的相关列表
1 dpkg -l | grep erlang
2. 卸载erlang已安装的相关内容
1 sudo apt-get purge --auto-remove erlang
RabbitMQ核心概念
Producer和Consumer
Producer: ⽣产者,是RabbitMQServer的客⼾端,向RabbitMQ发送消息
Consumer:消费者,也是RabbitMQServer的客⼾端,从RabbitMQ接收消息
Broker:其实就是RabbitMQServer,主要是接收和收发消息
⽣产者(Producer)创建消息,然后发布到RabbitMQ中.在实际应⽤中,消息通常是⼀个带有⼀定业务
逻辑结构的数据,⽐如JSON字符串.消息可以带有⼀定的标签,RabbitMQ会根据标签进⾏路由,把消
息发送给感兴趣的消费者(Consumer).
消费者连接到RabbitMQ服务器,就可以消费消息了,消费的过程中,标签会被丢掉.消费者只会收到
消息,并不知道消息的⽣产者是谁,当然消费者也不需要知道.
对于RabbitMQ来说,⼀个RabbitMQBroker可以简单地看作⼀个RabbitMQ服务节点,或者
RabbitMQ服务实例.⼤多数情况下也可以将⼀个RabbitMQBroker看作⼀台RabbitMQ服务器
Connection和Channel
Connection: 连接.是客⼾端和RabbitMQ服务器之间的⼀个TCP连接.端口号是5672,这个连接是建⽴消息传递的基
础,它负责传输客⼾端和服务器之间的所有数据和控制信息.
Channel: 通道,信道.Channel是在Connection之上的⼀个抽象层.在RabbitMQ中,⼀个TCP连接可以
有多个Channel,每个Channel都是独⽴的虚拟连接.消息的发送和接收都是基于Channel的.
通道的主要作⽤是将消息的读写操作复⽤到同⼀个TCP连接上,这样可以减少建⽴和关闭连接的开销,提⾼性能.
生产者和消费者都是客户端
Virtual host
Virtual host: 虚拟主机.这是⼀个虚拟概念.它为消息队列提供了⼀种逻辑上的隔离机制.对于
RabbitMQ⽽⾔,⼀个BrokerServer上可以存在多个VirtualHost.当多个不同的⽤⼾使⽤同⼀个
RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个⽤⼾在⾃⼰的vhost创建
exchange/queue 等类似MySQL的"database",是⼀个逻辑上的集合.⼀个MySQL服务器可以有多个database
Queue
Queue: 队列,是RabbitMQ的内部对象,⽤于存储消息
多个消费者,可以订阅同⼀个队列
Exchange
Exchange: 交换机.message到达broker的第⼀站,它负责接收⽣产者发送的消息,并根据特定的规则
把这些消息路由到⼀个或多个Queue列中.
Exchange起到了消息路由的作⽤,它根据类型和规则来确定如何转发接收到的消息.
类似于发快递之后,物流公司怎么处理呢,根据咱们的地址来分派这个快递到不同的站点,然后再送到
收件⼈⼿⾥.这个分配的⼯作,就是交换机来做的
RabbitMQ⼯作流程
- Producer⽣产了⼀条消息
- Producer连接到RabbitMQBroker,建⽴⼀个连接(Connection),开启⼀个信道(Channel)
- Producer声明⼀个交换机(Exchange),路由消息
- Producer声明⼀个队列(Queue),存放信息
- Producer发送消息⾄RabbitMQBroker
- RabbitMQBroker接收消息,并存⼊相应的队列(Queue)中,如果未找到相应的队列,则根据⽣产者
的配置,选择丢弃或者退回给⽣产者.
AMQP
AMQP(AdvancedMessageQueuingProtocol)是⼀种⾼级消息队列协议,AMQP定义了⼀套确定的
消息交换功能,包括交换器(Exchange),队列(Queue)等.这些组件共同⼯作,使得⽣产者能够将消息发
送到交换器.然后由队列接收并等待消费者接收.AMQP还定义了⼀个⽹络协议,允许客⼾端应⽤通过该
协议与消息代理和AMQP模型进⾏交互通信
RabbitMQ是遵从AMQP协议的,换句话说,RabbitMQ就是AMQP协议的Erlang的实现(当然RabbitMQ还⽀持STOMP2,MQTT2等协议).AMQP的模型结构和RabbitMQ的模型结构是⼀样的
RabbiMQ简单使用
界面操作
这里可以创建用户,然后设置用户类型,这样点右上角退出,就可以重新登录了
点这个,然后可以创建虚拟机了
这样虚拟机就出来了
点进去这个虚拟机
使用哪个账号添加的这个虚拟机,就这个用户有对应的权限
这里面点击set,就可以对对应的设置权限了
这里可以查看用户的权限,也可以在虚拟机里面看有哪些用户的权限
都是可以set的
默认每个用户都有一个杠的虚拟机,但是不一定有权限
还可以清理权限
springboot集成RabbitMQ
RabbitMQ 共提供了7种⼯作模式
官网
这就是模式
第一第二用的是默认的交换机
X是交换机,起了作用的
第三个是广播的模式
第四个是有条件的模式
第五个是通配符的条件
我们用第一个
先引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在user-service里面引入
然后是添加配置
#配置RabbitMQ的基本信息
spring:rabbitmq:host: 110.41.51.65port: 15673 #默认为5672 username: studypassword: studyvirtual-host: bite #默认值为 /
或者下面这个方式
#amqp://username:password@Ip:port/virtual-hostspring:rabbitmq:addresses: amqp://bite:1234@139.159.230.105:5672/blog
我们先声明一个队列
用这个
这个就是队列
然后就是写生产者了
就写一个测试方法就可以了
RabbitMQ和redis一样,提供了一个template,自动注入bean的,很好用
第一个参数是交换机的名称,我们可以写一个空的交换机,这个是默认的交换机
就是第一个交换机
他是direct类型的,要有绑定规则,第二个参数就是绑定规则
这个交换机有一个绑定规则叫做routingKey,就是在发送消息的时候,要携带一个标签
第三个参数就是消息内容了
routingKey就是决定这个交换机路由到哪个队列
这个就是默认的交换机的默认的绑定规则
就是交换机“”把消息,给队列hello
运行一下
看到这里有一个hello的队列
点进去就可以看到我们的消息内容了
然后开始看消费者代码
要用RabbitListener这个注解了
我们生产者多生产几个消息,注意Message的包不用弄错了,有很多个
然后就是重启服务
这样就收到消息了,这个是服务启动就一直监听的
所以会打印完
用户服务引入RabbitMQ
我们增加一个配置,就是手动确认方式
默认情况下:消费者取消息的时候,消息会自动的删除,自动确认
还有一种方式是手动确认
spring:rabbitmq:addresses: amqp://bite:1234@139.159.230.105:5672/bloglistener:direct:acknowledge-mode: manual
这样就是手动确认的了
然后是声明一个队列
这次我们用这个发布订阅模式,就是交换机根据一定的规则,把消息路由到一个或者多个队列中
交换机有多个类型,一个Fanout,direct,topic,headers(用的少)
原来我们使用的就是direct
Fanout是广播的模式:把消息交给所有绑定到这个交换机的队列
direct是把消息交给符合指定routing key的队列
topic是把消息交给符合指定routing pattern(路由模式)的队列
Fanout就是binding key为空
topic就是可以用通配符的,可以有*
我们使用广播模式,那么binding key和routing key都为空
我们给队列取名,名字弄成一个常量类
然后写队列
我们刚刚用的交换机是默认的交换机
现在我们自己创建一个我们自己的交换机
然后是交换机和队列进行绑定
注意引入的包都是amqp的
简单模式我们没有像这样一样进行绑定,因为使用的是默认的交换机,就不用声明交换机了
routing key就是去绑定了
默认交换机就是这样的,要用routing key,指定队列,默认交换机中routing key就等于队列名称
第一个参数是交换机名称
不用默认交换机的话,就要自己定义交换机,和绑定关系
然后开始写生产者
生产者
@Autowiredprivate RabbitTemplate rabbitTemplate;
先注入进来
这样就发送完成了
现在开始写消费者
这样就写好了,但是我们要手动确认,怎么确认呢
确认的时候,先确认这个消息的id
先拿到id
获取消息的id,在信道上唯一,确认的时候也是在信道上确认
所以参数还要拿到channel这个信道
选这个
这样就写好了
但是我们这样写还是会报错的
为什么呢
默认情况下 RabbitMQ 是自动ACK(确认签收)机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK(确认),若是在代码中再手动确认签收,就会造成确认错误。
所以改一下
这样就不会出现报错信息了
我们还可以单独用一个类,来作为消息的发送,也是可以的
看到队列和交换机都对应起来了,绑定起来了
第二种方式写法
直接注释掉
我们可以直接用注解RabbitListener来声明绑定关系
@RabbitListener(bindings = @QueueBinding())
这个就是声明了一个绑定关系了
这样写就可以了,和刚刚写的作用是一模一样的