欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Spring Boot整合RocketMQ实现延迟消息消费

Spring Boot整合RocketMQ实现延迟消息消费

2024/10/27 14:06:04 来源:https://blog.csdn.net/2301_80488214/article/details/139986638  浏览:    关键词:Spring Boot整合RocketMQ实现延迟消息消费
导包
     <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:consumer:consume-message-batch-max-size: 1consume-mode: ORDERLYconsume-thread-max: 1group: myGrouptopics: topic1name-server: 106.52.60.215:9876producer:group: myGrouptopics: topic1
生产者发送消息
同步发现消息

在Spring Boot中,可以使用RocketMQTemplate来发送消息。设置消息的延迟级别,可以使用RocketMQTemplatesend(Message message, long timeout, int delayLevel)方法,其中delayLevel为延迟级别,单位为秒

RocketMQ支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class MyProducer1 {@Value("${rocketmq.producer.topics}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 同步发送延迟消息** @param:  []* @return* @Date   2023/3/11*/@GetMapping("syncSendTest")public void sendDelayMsg() {Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// delayTimeLevel代表延迟级别  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hint delayTimeLevel = 3;Message<Blog> message = MessageBuilder.withPayload(blog).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel).build();SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());log.info("发送结果:{}", sendResult);}
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class DemoProducers {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 延迟消息发送** @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,3);return "发送成功";}
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see 
@Slf4j
@RestController
public class MyProducer2 {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.topics}")private String topic;/*** description 延迟消息发送* 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。* 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)* ,第四个参数是发送消息的重试次数。* @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,4);return "发送成功";}
}
启动测试

启动请求:http://localhost:8081/asyncSendTest

控制台打印

可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟

延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。

通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com