在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。
1. 添加依赖
首先,在
pom.xml
中添加 Spring Kafka 的依赖:xml
复制
<dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Kafka Test --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency> </dependencies>运行 HTML
2. 配置 Kafka
在
application.properties
或application.yml
中配置 Kafka 的连接信息:properties
复制
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3. 创建 Kafka 生产者
创建一个 Kafka 生产者,用于发送消息到 Kafka 主题:
java
复制
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;@Service public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);} }
4. 创建 Kafka 消费者
创建一个 Kafka 消费者,用于从 Kafka 主题中接收消息:
java
复制
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service;@Service public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);} }
5. 创建控制器
创建一个控制器,用于发送消息到 Kafka 主题:
java
复制
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent: " + message;} }
6. 启动应用程序
创建一个 Spring Boot 应用程序启动类:
java
复制
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);} }
7. 运行应用程序
启动 Spring Boot 应用程序,然后访问
http://localhost:8080/send?message=Hello
,你将看到控制台输出Received message: Hello
。总结
通过以上步骤,我们成功地在 Spring Boot 中整合了 Kafka,并实现了消息的生产和消费。Spring Kafka 提供了简单易用的 API,使得我们可以轻松地与 Kafka 进行交互。你可以根据实际需求进一步扩展和优化这个示例。
在 Kafka 中,消费者消费消息失败后是否重新入队(重回队列)取决于消费者的配置和处理逻辑。Kafka 本身并没有直接提供“消费失败重回队列”的功能,但可以通过一些策略来实现类似的效果。
Kafka 消费者的基本行为
自动提交偏移量:默认情况下,Kafka 消费者会自动提交偏移量。如果消息处理失败,偏移量仍然会被提交,导致消息丢失。
手动提交偏移量:通过配置
enable.auto.commit=false
,消费者可以手动提交偏移量。这样可以在消息处理成功后再提交偏移量,确保消息不会丢失。实现消费失败重回队列的策略
1. 手动提交偏移量
通过手动提交偏移量,可以在消息处理成功后再提交偏移量,从而避免消息丢失。如果消息处理失败,可以选择不提交偏移量,让消费者在下一次拉取消息时重新处理该消息。
java
复制
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service;@Service public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {// 处理消息processMessage(record.value());// 处理成功后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 处理失败,不提交偏移量,让消费者在下一次拉取时重新处理该消息System.err.println("Message processing failed: " + e.getMessage());}}private void processMessage(String message) {// 模拟消息处理逻辑if (message.contains("error")) {throw new RuntimeException("Simulated error");}System.out.println("Processed message: " + message);} }