欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 深入学习 Kafka(3)- SpringBoot 整合 Kafka

深入学习 Kafka(3)- SpringBoot 整合 Kafka

2024/10/25 19:37:05 来源:https://blog.csdn.net/java_liuyuan/article/details/140101571  浏览:    关键词:深入学习 Kafka(3)- SpringBoot 整合 Kafka

1. 引入 jar

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2. yml 配置

spring:kafka:bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 # kafka 服务地址,多个以逗号隔开producer:retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送acks: 1 #只要Leader副本确认接收就算成功batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类consumer:enable-auto-commit: false # 关闭自动提交 ackauto-commit-interval: 100 # 消费者自动提交偏移量的时间间隔,enable-auto-commit=false时,次配置不生效auto-offset-reset: earliestmax-poll-records: 500 # 每次拉取的最大记录数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: ${APP_NAME}listener:ack-mode: manual_immediate # 手动ackpoll-timeout: 500ms # 每次调用poll()时,如果没有消息可消费,将等待最多500ms

3. 实现代码

生产者:
@Slf4j
@Component
public class KafkaProducer {private final ExecutorService executorService = Executors.newFixedThreadPool(10);@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** @param topic 要发送到的Kafka主题名称* @param msg   要发送的消息内容*/public void send(String topic, String msg) {executorService.submit(() -> kafkaTemplate.send(topic, msg).addCallback(result -> {if (result != null && result.getRecordMetadata() != null) {log.info("消息发送成功,offset = {}", result.getRecordMetadata().offset());} else {log.warn("消息发送完成,但结果或其元数据为空");}}, throwable -> log.error("消息发送失败,原因 = {}", throwable.getMessage())));}@PreDestroypublic void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
}
消费者:
@Component
public class KafkaConsumer {@KafkaListener(topics = "myTopic",groupId = "xxx", properties = "max.poll.records:5", concurrency = "3")  public void listen(ConsumerRecord<?, ?> record) {  // 处理消息  ack.acknowledge();}
}// topics:可以消费多个topic
// groupId:不同的groupId可以独立消费topic
// properties = "max.poll.records:5":每次轮询时消费者从Kafka服务器拉取的最大记录数(即消息数)
// concurrency:并发消费者的数量

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com