欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > RocketMQ

RocketMQ

2024/11/30 20:30:02 来源:https://blog.csdn.net/2301_79694645/article/details/141899202  浏览:    关键词:RocketMQ

消息中间件的产生的背景

在网络通讯中,Http请求默认采用同步请求方式,基于请求与响应模式,在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响

1.什么是消息中间件

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)例如:寄快递

2.消息中间件使用场景

2.1 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端

引入消息队列,改造后的架构如下

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了2倍

2.2应用解耦

场景说明:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口

解耦合后:

订单系统:假如在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统与库存系统的应用解耦

3.常见消息中间件比较

特性MQ

ActiveMQ

RabbitMQ

RocketMQ

Kafka

生产者消费者模式

支持

支持

支持

支持

发布订阅模式

支持

支持

支持

支持

请求回应模式

支持

支持

不支持

不支持

Api完备性

多语言支持

支持

支持

java

支持

单机吞吐量

万级

万级

万级

十万级

消息延迟

微秒级

毫秒级

毫秒级

可用性

高(主从)

高(主从)

非常高(分布式)

非常高(分布式)

消息丢失

理论上不会丢失

理论上不会丢失

文档的完备性

较高

提供快速入门

社区活跃度

商业支持

商业云

商业云

4.RocketMQ

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的应用场景考验。

4.1 环境准备

下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求

  • 64位操作系统
  • JDK 1.8+
  • 安装Maven
4.2 安装RocketMQ
  • 解压缩安装包
  • 配置环境变量
    • 变量名:ROCKETMQ_HOME 变量值:MQ解压缩路径
    • 编辑: path %ROCKETMQ_HOME%\bin
4.3 启动RocketMQ
  • 切换到安装目录
    • rocketmq的bin目录下
  • 启动NameServer
    • start mqnamesrv.cmd
  • 启动Broker
    • start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
    • 如果弹出框提示‘错误: 找不到或无法加载主类 xxxxxx’。在bin下找到并打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号

4.4 安装可视化插件
  • github下载
  • rocketmq-externals-rocketmq-console-1.0.0.zip解压压缩包
  • 进入\rocketmq-console\src\main\resources文件加,用编辑器打开application.properties文件
  • 进入rocketmq-externals\rocketmq-console 文件夹,执行:mvn clean package -Dmaven.test.skip=true,编译生成jar包
  • 编译成功后,在rocketmq-externals\rocketmq-console下会生成target文件夹
  • 进入target后执行:java -jar rocketmq-console-ng-1.0.0.jar,这里是在启动jar工程。启动完毕后,在浏览器输入:http://localhost:8085/进入控制台

5.RocketMQ的架构及概念

如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer

  • Broker(邮局,邮递员)
    • Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
  • NameServer(各个邮局的管理机构)
    • 消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
  • Producer(寄件人)
    • 消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
  • Consumer(收件人)
    • 消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
  • Topic(地区)
    • 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
  • Message Queue
    • 为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个 Message Queue读取消息
  • Message
    • Message 是消息的载体。

6. 消息发送接受

<dependency> <groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId> <version>4.4.0</version> 
</dependency>
6.1 发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。消息发送步骤:

1. 创建消息生产者, 指定生产者所属的组名

2. 指定Nameserver地址

3. 启动生产者

4. 创建消息对象,指定主题、标签和消息体

5. 发送消息

6. 关闭生产者

//发送消息 
public class RocketMQSendTest {public static void main(String[] args) throws Exception { //1. 创建消息生产者, 指定生产者所属的组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); //2. 指定Nameserver地址 producer.setNamesrvAddr("192.168.109.131:9876"); //3. 启动生产者 producer.start();//4. 创建消息对象,指定主题、标签和消息体 Message msg = new Message("myTopic", "myTag", ("RocketMQ 
Message").getBytes());//5. 发送消息SendResult sendResult = producer.send(msg); System.out.println(sendResult); //6. 关闭生产者producer.shutdown(); }
}
6.2 接收消息

消息接收步骤:

1. 创建消息消费者, 指定消费者所属的组名

2. 指定Nameserver地址

3. 指定消费者订阅的主题和标签

4. 设置回调函数,编写处理消息的方法

5. 启动消息消费者

//接收消息 
public class RocketMQReceiveTest { public static void main(String[] args) throws MQClientException { //1. 创建消息消费者, 指定消费者所属的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");//2. 指定Nameserver地址 consumer.setNamesrvAddr("192.168.109.131:9876"); //3. 指定消费者订阅的主题和标签 consumer.subscribe("myTopic", "*"); //4. 设置回调函数,编写处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {System.out.println("Receive New Messages: " + msgs); //返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5. 启动消息消费者 consumer.start();System.out.println("Consumer Started."); } 
}
6.3 发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。只会等待MQ发送状态

//1. 创建消息生产者, 指定生产者所属的组名DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");//2. 指定Nameserver地址producer.setNamesrvAddr("127.0.0.1:9876");//3. 启动生产者producer.start();for (int i = 0;i<10;i++){//4. 创建消息对象,指定主题、标签和消息体Message msg = new Message("myTopic", "myTag2", ("防疫政策修改
~~~").getBytes());//5. 发送消息producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功:"+sendResult);}@Overridepublic void onException(Throwable e) {System.out.println("发送异常:"+e);}});TimeUnit.SECONDS.sleep(3);}//6. 关闭生产者producer.shutdown();
6.4 单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

//1. 创建消息生产者, 指定生产者所属的组名DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");//2. 指定Nameserver地址producer.setNamesrvAddr("127.0.0.1:9876");//3. 启动生产者producer.start();for (int i = 0;i<10;i++){//4. 创建消息对象,指定主题、标签和消息体Message msg = new Message("myTopic", "myTag3", ("防疫政策修改
~~~").getBytes());//5. 发送消息// 发送单向消息,没有任何返回结果producer.sendOneway(msg);TimeUnit.SECONDS.sleep(3);}//6. 关闭生产者producer.shutdown();
6.5消费消息

1. 负载均衡模式(默认方式)

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

2. 广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);

7 使用场景

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信

7.1 订单微服务发送消息
1 在 shop-order服务中添加rocketmq的依赖
<!--rocketmq--> 
<dependency><groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> 
</dependency>
2 添加配置
rocketmq:name-server: 127.0.0.1:9876 #rocketMQ服务的地址producer: group: shop-order #生产者组
3 编写测试代码
7.2 用户微服务订阅消息
1 修改 shop-user 模块配置
 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version></dependency>
2 修改置文件
rocketmq: name-server: 127.0.0.1:9876
3 编写消息接收服务
//发送短信的服务 
@Slf4j
@Service 
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order order) {log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));} 
}
4 启动服务,执行下单操作,观看后台输出

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com