欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > Kafka + SpringBoot集成

Kafka + SpringBoot集成

2025/2/12 20:11:15 来源:https://blog.csdn.net/qq_42765398/article/details/144240192  浏览:    关键词:Kafka + SpringBoot集成

学习贴:参考https://blog.csdn.net/qq_20865839/article/details/133948989

简单SpringBoot整合:

依赖:

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

生产者


@RestController
public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message);}

消费者:


@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"sb_topic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}

生产者

带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

    /*** 回调的第二种写法* @param message*/@GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}

监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。

注意:
当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。


@Configuration
public class KafkaConfig {@AutowiredProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {System.out.println("发送成功 " + producerRecord.toString());}@Overridepublic void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {System.out.println("发送失败" + producerRecord.toString());System.out.println(exception.getMessage());}@Overridepublic void onError(String topic, Integer partition, String key, Object value, Exception exception) {System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);System.out.println(exception.getMessage());}});return kafkaTemplate;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com