欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > Spring Kafka 本地消息表与定时任务处理未发送消息方案

Spring Kafka 本地消息表与定时任务处理未发送消息方案

2025/4/21 23:59:56 来源:https://blog.csdn.net/iaong/article/details/147273569  浏览:    关键词:Spring Kafka 本地消息表与定时任务处理未发送消息方案

核心设计思路‌

通过本地数据库表持久化待发送消息,配合定时任务扫描未成功发送的消息并重试,确保消息可靠投递至 Kafka。该方案可解决因网络波动、Kafka 集群异常等导致的消息丢失问题‌。

实现步骤‌
1. 创建本地消息表‌

在业务数据库中设计消息表,记录消息状态和关键信息:

CREATE TABLE kafka_local_message (
    id VARCHAR(36) PRIMARY KEY,          -- 唯一标识(如 UUID)
    topic VARCHAR(255) NOT NULL,         -- Kafka Topic
    message TEXT NOT NULL,               -- 消息内容(JSON 序列化)
    status TINYINT DEFAULT 0,            -- 状态(0-未发送, 1-已发送, 2-发送失败)
    retry_count INT DEFAULT 0,           -- 重试次数
    create_time DATETIME NOT NULL,       -- 创建时间
    update_time DATETIME                 -- 更新时间
);

2. 业务逻辑中预存消息‌

在业务操作的事务中,‌先写入本地消息表,再发送消息到 Kafka‌,确保数据库与消息发送的原子性:

@Transactional
public void saveOrderAndSendMessage(Order order) {
    // 1. 保存订单到数据库
    orderRepository.save(order);
    
    // 2. 生成消息并写入本地表
    String messageId = UUID.randomUUID().toString();
    KafkaMessage message = new KafkaMessage(
        messageId,
        "order-topic",
        JSON.toJSONString(order),
        0,
        LocalDateTime.now()
    );
    kafkaMessageRepository.save(message);
    
    // 3. 异步发送消息到 Kafka
    kafkaTemplate.send("order-topic", message.getMessage())
        .addCallback(result -> updateMessageStatus(messageId, 1),  // 成功回调
                     ex -> updateMessageStatus(messageId, 2));     // 失败回调
}


关键点‌:

使用 @Transactional 确保订单和消息记录同时提交或回滚‌。
发送消息后通过回调更新消息状态,避免阻塞主流程‌。


3. 定时任务补偿未发送消息‌

配置定时任务扫描未发送或失败的消息,触发重试:

@Scheduled(fixedDelay = 30000)  // 每 30 秒执行一次
public void retryFailedMessages() {
    List<KafkaMessage> messages = kafkaMessageRepository
        .findByStatusAndRetryCountLessThan(0, 3);  // 状态为未发送且重试次数 < 3
    
    messages.forEach(msg -> {
        try {
            kafkaTemplate.send(msg.getTopic(), msg.getMessage()).get();  // 同步发送
            updateMessageStatus(msg.getId(), 1);
        } catch (Exception e) {
            msg.setRetryCount(msg.getRetryCount() + 1);
            kafkaMessageRepository.save(msg);
        }
    });
}


优化项‌:

限制最大重试次数(如 3 次),防止无限重试导致资源浪费‌。
动态调整重试间隔(如指数退避算法),减轻系统压力‌。


4. 生产者配置增强可靠性‌

在 application.yml 中优化 Kafka 生产者参数:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all                     # 等待所有副本确认
      retries: 5                    # 重试次数
      delivery-timeout-ms: 15000    # 发送超时时间
      enable-idempotence: true      # 启用幂等性(防止重复)
    properties:
      max.in.flight.requests.per.connection: 1  # 保证消息顺序性

注意事项‌


消息去重‌:
使用 idempotence(幂等性)和本地消息表的唯一 ID 避免重复消费‌。
事务一致性‌:
若涉及分布式事务,可结合 @Transactional 与 Kafka 事务管理器‌。
监控告警‌:
监控 retry_count 和未发送消息数量,及时介入处理异常积压‌。

方案优势‌


可靠性‌:通过本地存储 + 定时重试,确保消息最终可达 Kafka。
低侵入性‌:业务代码仅需增加本地表操作,无需复杂改造。
兼容性‌:适用于 Spring Boot 2.x/3.x 和不同版本 Kafka 集群‌。

通过此方案可有效解决 Spring Kafka 消息发送不可靠的问题,适用于电商、金融等高一致性场景‌。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词