欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 使用redis 的stream 做消息中间件使用线程池来消费消息

使用redis 的stream 做消息中间件使用线程池来消费消息

2025/4/19 14:37:26 来源:https://blog.csdn.net/weixin_45069056/article/details/144340538  浏览:    关键词:使用redis 的stream 做消息中间件使用线程池来消费消息

1.消息生产者服务

1.1 代码如下
@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";/*** 发送消息*/public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息发送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息发送失败: topic={}, message={}", topic, message, e);throw new RuntimeException("消息发送失败", e);}}/*** 批量发送消息*/public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息发送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息发送失败: topic={}", topic, e);throw new RuntimeException("批量消息发送失败", e);}}
}

2.消息消费者服务 使用线程池来消费

2.1线程池配置代码如下
@Configuration
@Slf4j
public class ThreadPoolConfig {@Bean("messageConsumerPool")public ThreadPoolExecutor messageConsumerPool() {return new ThreadPoolExecutor(10,                       // 核心线程数20,                     // 最大线程数60L,                     // 空闲线程存活时间TimeUnit.SECONDS,        // 时间单位new LinkedBlockingQueue<>(1000),  // 工作队列new ThreadFactoryBuilder().setNameFormat("msg-consumer-%d").setUncaughtExceptionHandler((t, e) -> log.error("消费者线程异常: {}", t.getName(), e)).build(),new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略);}@Bean("messageProcessPool")public ThreadPoolExecutor messageProcessPool() {return new ThreadPoolExecutor(50,                      // 核心线程数100,                    // 最大线程数60L,                     // 空闲线程存活时间TimeUnit.SECONDS,        // 时间单位new LinkedBlockingQueue<>(5000), // 工作队列new ThreadFactoryBuilder().setNameFormat("msg-process-%d").setUncaughtExceptionHandler((t, e) -> log.error("处理线程异常: {}", t.getName(), e)).build(),new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略);}
}
2.2 使用线程池消费代码如下
@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowired@Qualifier("messageConsumerPool")private ThreadPoolExecutor consumerPool;@Autowired@Qualifier("messageProcessPool")private ThreadPoolExecutor processPool;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();private final Map<String, AtomicBoolean> consumerStates = new ConcurrentHashMap<>();/*** 注册消息处理器*/public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));consumerStates.put(topic, new AtomicBoolean(false));}/*** 启动消费*/@PostConstructpublic void startConsuming() {handlers.forEach((topic, handler) -> {AtomicBoolean state = consumerStates.get(topic);if (state.compareAndSet(false, true)) {// 为每个topic启动多个消费者int consumerCount = 3; // 每个topic的消费者数量for (int i = 0; i < consumerCount; i++) {startConsumer(topic, i);}}});}private void startConsumer(String topic, int consumerIndex) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();consumerPool.execute(() -> {try {createConsumerGroupIfNotExists(streamKey, consumerGroup);consumeMessages(streamKey, consumerGroup, consumerName, topic);} catch (Exception e) {log.error("消费者启动失败: topic={}, index={}", topic, consumerIndex, e);consumerStates.get(topic).set(false);}});}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (consumerStates.get(topic).get() && !Thread.currentThread().isInterrupted()) {try {// 批量读取消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10)  // 批量大小.block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {// 并行处理消息CompletableFuture<?>[] futures = records.stream().map(record -> CompletableFuture.runAsync(() -> processMessage(streamKey, group, record, handler),processPool)).toArray(CompletableFuture[]::new);// 等待所有消息处理完成CompletableFuture.allOf(futures).join();}} catch (Exception e) {log.error("消息消费异常: topic={}", topic, e);sleep(1000);}}}private <T> void processMessage(String streamKey, String group,MapRecord<String, String, String> record,StreamMessageHandler<T> handler) {String messageId = record.getId().getValue();try {// 处理消息String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());// 执行业务处理handler.getHandler().accept(message);// 确认消息redisTemplate.opsForStream().acknowledge(streamKey, group, messageId);} catch (Exception e) {log.error("消息处理失败: messageId={}", messageId, e);// 可以在这里实现重试逻辑handleProcessingError(streamKey, group, messageId, e);}}private void handleProcessingError(String streamKey, String group, String messageId, Exception e) {try {// 获取消息重试次数PendingMessage pendingMessage = redisTemplate.opsForStream().pending(streamKey, group).getPendingMessages().stream().filter(msg -> msg.getIdAsString().equals(messageId)).findFirst().orElse(null);if (pendingMessage != null && pendingMessage.getTotalDeliveryCount() > 3) {// 超过重试次数,移入死信队列moveToDeadLetter(streamKey, group, messageId);}// 否则等待下次重试} catch (Exception ex) {log.error("处理错误消息失败: messageId={}", messageId, ex);}}/*** 优雅关闭*/@PreDestroypublic void shutdown() {// 标记所有消费者停止consumerStates.forEach((topic, state) -> state.set(false));// 关闭线程池shutdownPool(consumerPool, "消费者线程池");shutdownPool(processPool, "处理线程池");}private void shutdownPool(ThreadPoolExecutor pool, String poolName) {try {pool.shutdown();if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {pool.shutdownNow();}} catch (InterruptedException e) {pool.shutdownNow();Thread.currentThread().interrupt();} finally {log.info("{} 已关闭", poolName);}}
}

3.消息处理监控

3.1代码如下
@Component
@Slf4j
public class MessageProcessingMonitor {private final Counter processedMessageCounter;private final Counter failedMessageCounter;private final Timer messageProcessingTimer;public MessageProcessingMonitor(MeterRegistry registry) {this.processedMessageCounter = Counter.builder("message.processed").description("处理消息计数").register(registry);this.failedMessageCounter = Counter.builder("message.failed").description("失败消息计数").register(registry);this.messageProcessingTimer = Timer.builder("message.processing.time").description("消息处理时间").register(registry);}public void recordProcessedMessage() {processedMessageCounter.increment();}public void recordFailedMessage() {failedMessageCounter.increment();}public Timer.Sample startTimer() {return Timer.start();}public void stopTimer(Timer.Sample sample) {sample.stop(messageProcessingTimer);}
}

4.线程池监控

4.1代码如下
@Component
@Slf4j
public class ThreadPoolMonitor {@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorThreadPools(@Qualifier("messageConsumerPool") ThreadPoolExecutor consumerPool,@Qualifier("messageProcessPool") ThreadPoolExecutor processPool) {logPoolStats("消费者线程池", consumerPool);logPoolStats("处理线程池", processPool);}private void logPoolStats(String poolName, ThreadPoolExecutor pool) {log.info("{} 状态: [活动线程: {}, 核心线程: {}, 最大线程: {}, " +"队列大小: {}, 完成任务: {}, 总任务: {}]",poolName,pool.getActiveCount(),pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getQueue().size(),pool.getCompletedTaskCount(),pool.getTaskCount());}
}

5.使用示例

5.1 代码如下
@Service
@Slf4j
public class OrderMessageService {@Autowiredprivate StreamMessageConsumer consumer;@Autowiredprivate MessageProcessingMonitor monitor;@PostConstructpublic void init() {consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);}private void processOrderMessage(OrderMessage message) {Timer.Sample sample = monitor.startTimer();try {log.info("开始处理订单消息: {}", message);// 业务处理逻辑processOrder(message);monitor.recordProcessedMessage();log.info("订单消息处理完成: {}", message);} catch (Exception e) {monitor.recordFailedMessage();log.error("订单消息处理失败: {}", message, e);throw e;} finally {monitor.stopTimer(sample);}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词