学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088
文章目录
- 普通消息
- 顺序消息
- 延迟消息
- 批量消息
- 事务消息
- SpringBoot整合RocketMQ
3.1 NameServer
NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主要包括两点:
Broker管理:Broker会把集群信息注册到NameServer上,NameServer会把这些信息记录下来,作为路由信息的基本数据。然后还会提供心态检测机制,检查Broker是否还存活
路由信息:因为NameServer存放的有Broker集群的基本信息(例如有哪些Borker可用,以及Broker下的队列信息),所以Producer和Consumer就可以通过NameServer知道整个Broker集群信息,生产者生产的信息就可以知道往哪个Broker下的哪个Message queue队列中投递消息,消费者也可以通过自己的配置信息去哪个broker下哪个队列中去拉取消息进行消费。
NameServer是无状态的,且NameServer集群下各个NameServer是互相不通信的,没有任何信息同步操作。每个Broker都会与NameServer集群下的每一个NameServer节点建立长链接,然后会向每一个NameServer注册自己的路由信息,所以每个NamerServer下都保存了Broker集群的完整路由信息。当某个NameServer节点挂掉后,消费者和生产者也可以通过其他NameServer获取到Broker的完整信息,所以大部分情况下NameServer通常会部署多个实例
Broker代理服务器
主要负责生产者生产消息的存储,为消费者拉取信息提供查询,也会存储消息相关的一些其他数据,例如Topic信息、队列信息、消费进度偏移量等。
而且Broker是服务高可用的保证:相对于NameServer来说,Broker的部署会相对于复杂一些。
普通主从集群模式:
这种集群下会给每个节点分配特定的角色,分为Master和Slave,一个Master可以对应多个Slave但是一个Slave只能对应一个Master,master负责响应生成存储消息的请求,并存储消息。slave负责储存从主节点同步过来的数据(可以选择同步或者异步),我们可以通过配置conf目录下的配置文件来选择如何配置。我们可以通过指定相同的BrokerName,不同的BrokerId来制定一个Broker集群。BrokerId中0代表master,非0代表slave。但是这种模式下的弊端就是各个节点的角色无法切换,如果一个master挂掉后,这一组的Broker就不可用了
Dledger高可用集群:
这个是RocketMQ4.5版本后提供的一种集群高可用模式,这个模式下会随机选举出一个节点作为master,当master节点挂了后,会通过Raft机制然后会从slave节点中选择一个节点升级为master。
普通消息
发送消息的方式有同步、异步、单向三种方式
同步方式:同步方式是最常用的方式,通常是发送一条消息后早收到服务端同步响应之后,然后再发送下一条消息,可以用于一些比较重要的场景,例如:短信发送
异步方式:发送一条消息到服务端后不需要等到服务端响应,就可以继续发送下一条消息,但是需要注册回调接口,然后通过回调接口获取服务端的响应,经常用于一些链路执行过长的场景
单向方式:不需要等待服务端响应也不需要注册回调接口,就可以继续发送下一条消息,耗时非常短,通常适用于耗时短但是可靠性要求没那么高的场景。
发送消息的步骤大概如下:
- 创建生产者:普通消息通常都是DefaultProducer,然后设置一个produceGroup的名字
- 注册NameServer地址:可以setNameServer(),如果是多个中间以;分开
- 构建消息体:Topic、tag、keys、消息体等信息
- 发送消息:通过send()方法发送消息
顺序消息
顺序消息是对生产者生产消息和消费者消费消息的顺序有严格要求的。
但是RocketMQ并不能保证所有消息的有序性,因为默认情况下一个Topic下的消息会发送到不同的Message queue上,消费者也会从不同的Message queue上拉取消息,这种情况下是不能保证有序的。
RocketMQ的有序性是要保证Producer、Broker、Consumer三者的有序性,严格按照FIFO方式来对消息进行处理,我们可以从生产者把消息发送到同一个Message queue上,所以只能有一个生产者,因为多个生产者的生产的消息是无法有序的,并且生成者发送消息不能选择多线程的方式。然后消费者可以注册一个MessageListenerOrderly(),在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。
int orderId = i & 10;try {Message message = new Message("TopicTest",tags[i % tags.length],"KEYS"+i,"BODY".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置一个MessageQueueSelector队列选择器SendResult sendResult = orderMessageProducer.send(message, new MessageQueueSelector() {//list 消息队列列表,args 我们传入的orderId@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object args) {Integer id = (Integer)args;//这样可以确保相同orderId的消息总是发送到相同的队列,实现消息的顺序性return list.get( id%list.size() );}}, orderId);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
延迟消息
延时消息是生产者生产的消息发送到服务端后,并不希望马上被消费,而是希望延迟一段时间后才被消费
try {Message message = new Message("TopicTest","tags","OrderId" + i,"test".getBytes(RemotingHelper.DEFAULT_CHARSET));//设置延迟级别message.setDelayTimeLevel(3);SendResult sendResult = scheduledMessageProducer.send(message);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
批量消息
可以把消息整合到一批后在进行发送,可以增加吞吐率,并减少API和网络调用次数
ArrayList<Message> messages = new ArrayList<Message>();messages.add(new Message("TopicTest", "tag1", "1", "test1".getBytes()));messages.add(new Message("TopicTest", "tag2", "2", "test".getBytes()));messages.add(new Message("TopicTest", "tag3", "3", "test3".getBytes()));SendResult sendResult = batchProducer.send(messages);
事务消息
事务消息是在分布式系统中 保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两 个操作的原子性,也就是这两个操作一起成功或者一起失败。
对于一些数据具有强一致性场景的情况下,例如上游订单付款成功后,下游才可以进行积分变更、物流发货、购物车状态变更。这种类似场景下可以选择事务消息。
SpringBoot整合RocketMQ
创建一个MAVEN项目,引入依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.3.10.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.10.RELEASE</version></dependency>
</dependencies>
创建一个配置类:application.properties
#NameServer地址
rocketmq.name-server=ip地址:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup
创建一个启动类:
@SpringBootApplication
public class RocketMqApplication {public static void main(String[] args) {SpringApplication.run(RocketMqApplication.class,args);}
}
创建一个生产者类:
@Component
public class SpringProducer {@ResourceRocketMQTemplate rocketMqTemplate;public void sendMessage(String topic, String message){rocketMqTemplate.convertAndSend(topic,message);}
}
创建一个消费者类:
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TopicTest")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}
创建一个Controller类:
@RestController
public class MqSendController {@Resourceprivate SpringProducer springProducer;@Value("${producer.topic}")String topic;@GetMapping("/send")public void sendMessage(@RequestParam("message") String message){springProducer.sendMessage(topic,message);}
}
然后访问:http://localhost:8080/send?message=test