核心设计思路
通过本地数据库表持久化待发送消息,配合定时任务扫描未成功发送的消息并重试,确保消息可靠投递至 Kafka。该方案可解决因网络波动、Kafka 集群异常等导致的消息丢失问题。
实现步骤
1. 创建本地消息表
在业务数据库中设计消息表,记录消息状态和关键信息:
CREATE TABLE kafka_local_message (
id VARCHAR(36) PRIMARY KEY, -- 唯一标识(如 UUID)
topic VARCHAR(255) NOT NULL, -- Kafka Topic
message TEXT NOT NULL, -- 消息内容(JSON 序列化)
status TINYINT DEFAULT 0, -- 状态(0-未发送, 1-已发送, 2-发送失败)
retry_count INT DEFAULT 0, -- 重试次数
create_time DATETIME NOT NULL, -- 创建时间
update_time DATETIME -- 更新时间
);
2. 业务逻辑中预存消息
在业务操作的事务中,先写入本地消息表,再发送消息到 Kafka,确保数据库与消息发送的原子性:
@Transactional
public void saveOrderAndSendMessage(Order order) {
// 1. 保存订单到数据库
orderRepository.save(order);
// 2. 生成消息并写入本地表
String messageId = UUID.randomUUID().toString();
KafkaMessage message = new KafkaMessage(
messageId,
"order-topic",
JSON.toJSONString(order),
0,
LocalDateTime.now()
);
kafkaMessageRepository.save(message);
// 3. 异步发送消息到 Kafka
kafkaTemplate.send("order-topic", message.getMessage())
.addCallback(result -> updateMessageStatus(messageId, 1), // 成功回调
ex -> updateMessageStatus(messageId, 2)); // 失败回调
}
关键点:
使用 @Transactional 确保订单和消息记录同时提交或回滚。
发送消息后通过回调更新消息状态,避免阻塞主流程。
3. 定时任务补偿未发送消息
配置定时任务扫描未发送或失败的消息,触发重试:
@Scheduled(fixedDelay = 30000) // 每 30 秒执行一次
public void retryFailedMessages() {
List<KafkaMessage> messages = kafkaMessageRepository
.findByStatusAndRetryCountLessThan(0, 3); // 状态为未发送且重试次数 < 3
messages.forEach(msg -> {
try {
kafkaTemplate.send(msg.getTopic(), msg.getMessage()).get(); // 同步发送
updateMessageStatus(msg.getId(), 1);
} catch (Exception e) {
msg.setRetryCount(msg.getRetryCount() + 1);
kafkaMessageRepository.save(msg);
}
});
}
优化项:
限制最大重试次数(如 3 次),防止无限重试导致资源浪费。
动态调整重试间隔(如指数退避算法),减轻系统压力。
4. 生产者配置增强可靠性
在 application.yml 中优化 Kafka 生产者参数:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all # 等待所有副本确认
retries: 5 # 重试次数
delivery-timeout-ms: 15000 # 发送超时时间
enable-idempotence: true # 启用幂等性(防止重复)
properties:
max.in.flight.requests.per.connection: 1 # 保证消息顺序性
注意事项
消息去重:
使用 idempotence(幂等性)和本地消息表的唯一 ID 避免重复消费。
事务一致性:
若涉及分布式事务,可结合 @Transactional 与 Kafka 事务管理器。
监控告警:
监控 retry_count 和未发送消息数量,及时介入处理异常积压。
方案优势
可靠性:通过本地存储 + 定时重试,确保消息最终可达 Kafka。
低侵入性:业务代码仅需增加本地表操作,无需复杂改造。
兼容性:适用于 Spring Boot 2.x/3.x 和不同版本 Kafka 集群。
通过此方案可有效解决 Spring Kafka 消息发送不可靠的问题,适用于电商、金融等高一致性场景。