1.消息生产者服务
1.1 代码如下
@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息发送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息发送失败: topic={}, message={}", topic, message, e);throw new RuntimeException("消息发送失败", e);}}public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息发送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息发送失败: topic={}", topic, e);throw new RuntimeException("批量消息发送失败", e);}}
}
2.消息消费者服务 使用线程池来消费
2.1线程池配置代码如下
@Configuration
@Slf4j
public class ThreadPoolConfig {@Bean("messageConsumerPool")public ThreadPoolExecutor messageConsumerPool() {return new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("msg-consumer-%d").setUncaughtExceptionHandler((t, e) -> log.error("消费者线程异常: {}", t.getName(), e)).build(),new ThreadPoolExecutor.CallerRunsPolicy() );}@Bean("messageProcessPool")public ThreadPoolExecutor messageProcessPool() {return new ThreadPoolExecutor(50, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000), new ThreadFactoryBuilder().setNameFormat("msg-process-%d").setUncaughtExceptionHandler((t, e) -> log.error("处理线程异常: {}", t.getName(), e)).build(),new ThreadPoolExecutor.CallerRunsPolicy() );}
}
2.2 使用线程池消费代码如下
@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowired@Qualifier("messageConsumerPool")private ThreadPoolExecutor consumerPool;@Autowired@Qualifier("messageProcessPool")private ThreadPoolExecutor processPool;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();private final Map<String, AtomicBoolean> consumerStates = new ConcurrentHashMap<>();public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));consumerStates.put(topic, new AtomicBoolean(false));}@PostConstructpublic void startConsuming() {handlers.forEach((topic, handler) -> {AtomicBoolean state = consumerStates.get(topic);if (state.compareAndSet(false, true)) {int consumerCount = 3; for (int i = 0; i < consumerCount; i++) {startConsumer(topic, i);}}});}private void startConsumer(String topic, int consumerIndex) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();consumerPool.execute(() -> {try {createConsumerGroupIfNotExists(streamKey, consumerGroup);consumeMessages(streamKey, consumerGroup, consumerName, topic);} catch (Exception e) {log.error("消费者启动失败: topic={}, index={}", topic, consumerIndex, e);consumerStates.get(topic).set(false);}});}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (consumerStates.get(topic).get() && !Thread.currentThread().isInterrupted()) {try {List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10) .block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {CompletableFuture<?>[] futures = records.stream().map(record -> CompletableFuture.runAsync(() -> processMessage(streamKey, group, record, handler),processPool)).toArray(CompletableFuture[]::new);CompletableFuture.allOf(futures).join();}} catch (Exception e) {log.error("消息消费异常: topic={}", topic, e);sleep(1000);}}}private <T> void processMessage(String streamKey, String group,MapRecord<String, String, String> record,StreamMessageHandler<T> handler) {String messageId = record.getId().getValue();try {String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());handler.getHandler().accept(message);redisTemplate.opsForStream().acknowledge(streamKey, group, messageId);} catch (Exception e) {log.error("消息处理失败: messageId={}", messageId, e);handleProcessingError(streamKey, group, messageId, e);}}private void handleProcessingError(String streamKey, String group, String messageId, Exception e) {try {PendingMessage pendingMessage = redisTemplate.opsForStream().pending(streamKey, group).getPendingMessages().stream().filter(msg -> msg.getIdAsString().equals(messageId)).findFirst().orElse(null);if (pendingMessage != null && pendingMessage.getTotalDeliveryCount() > 3) {moveToDeadLetter(streamKey, group, messageId);}} catch (Exception ex) {log.error("处理错误消息失败: messageId={}", messageId, ex);}}@PreDestroypublic void shutdown() {consumerStates.forEach((topic, state) -> state.set(false));shutdownPool(consumerPool, "消费者线程池");shutdownPool(processPool, "处理线程池");}private void shutdownPool(ThreadPoolExecutor pool, String poolName) {try {pool.shutdown();if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {pool.shutdownNow();}} catch (InterruptedException e) {pool.shutdownNow();Thread.currentThread().interrupt();} finally {log.info("{} 已关闭", poolName);}}
}
3.消息处理监控
3.1代码如下
@Component
@Slf4j
public class MessageProcessingMonitor {private final Counter processedMessageCounter;private final Counter failedMessageCounter;private final Timer messageProcessingTimer;public MessageProcessingMonitor(MeterRegistry registry) {this.processedMessageCounter = Counter.builder("message.processed").description("处理消息计数").register(registry);this.failedMessageCounter = Counter.builder("message.failed").description("失败消息计数").register(registry);this.messageProcessingTimer = Timer.builder("message.processing.time").description("消息处理时间").register(registry);}public void recordProcessedMessage() {processedMessageCounter.increment();}public void recordFailedMessage() {failedMessageCounter.increment();}public Timer.Sample startTimer() {return Timer.start();}public void stopTimer(Timer.Sample sample) {sample.stop(messageProcessingTimer);}
}
4.线程池监控
4.1代码如下
@Component
@Slf4j
public class ThreadPoolMonitor {@Scheduled(fixedRate = 60000) public void monitorThreadPools(@Qualifier("messageConsumerPool") ThreadPoolExecutor consumerPool,@Qualifier("messageProcessPool") ThreadPoolExecutor processPool) {logPoolStats("消费者线程池", consumerPool);logPoolStats("处理线程池", processPool);}private void logPoolStats(String poolName, ThreadPoolExecutor pool) {log.info("{} 状态: [活动线程: {}, 核心线程: {}, 最大线程: {}, " +"队列大小: {}, 完成任务: {}, 总任务: {}]",poolName,pool.getActiveCount(),pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getQueue().size(),pool.getCompletedTaskCount(),pool.getTaskCount());}
}
5.使用示例
5.1 代码如下
@Service
@Slf4j
public class OrderMessageService {@Autowiredprivate StreamMessageConsumer consumer;@Autowiredprivate MessageProcessingMonitor monitor;@PostConstructpublic void init() {consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);}private void processOrderMessage(OrderMessage message) {Timer.Sample sample = monitor.startTimer();try {log.info("开始处理订单消息: {}", message);processOrder(message);monitor.recordProcessedMessage();log.info("订单消息处理完成: {}", message);} catch (Exception e) {monitor.recordFailedMessage();log.error("订单消息处理失败: {}", message, e);throw e;} finally {monitor.stopTimer(sample);}}
}