欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > Kafka系列之SpringBoot集成Kafka

Kafka系列之SpringBoot集成Kafka

2024/10/24 4:45:59 来源:https://blog.csdn.net/ToBeMaybe_/article/details/140184664  浏览:    关键词:Kafka系列之SpringBoot集成Kafka

本文介绍如何在springboot项目中集成kafka收发message。

pom依赖

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

	<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

Kafka相关的yaml配置

spring:kafka:bootstrap-servers: 30.46.35.29:9092producer:retries: 3acks: -1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4properties:linger.ms: 1'interceptor.classes': com.tencent.qidian.ma.commontools.trace.kafka.TracingProducerInterceptorconsumer:heartbeat-interval: 3000max-poll-records: 100enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 30000listener:concurrency: 3type: batchack-mode: manual_immediate

以下为相关配置的说明:

spring.kafka

  • spring.kafka.bootstrap-servers: 指定 Kafka 服务器的地址列表,格式为 host:port,多个地址使用逗号分隔。

spring.kafka.producer用于配置 Kafka 消费者相关属性

  • spring.kafka.producer.key-serializer: 用于配置 Kafka 生产者发送消息中键(key)的序列化器。可以是字符串形式的完全限定类名,也可以是一个实现 org.apache.kafka.common.serialization.Serializer 接口的自定义序列化器类。
    常见的键序列化器包括:
    org.apache.kafka.common.serialization.StringSerializer:将键对象作为字符串进行序列化。
    org.apache.kafka.common.serialization.IntegerSerializer:将键对象作为整数进行序列化。
    org.apache.kafka.common.serialization.ByteArraySerializer:将键对象直接作为字节数组进行序列化。
    自定义的序列化器:根据自己的需求实现键的序列化逻辑。
    在 Kafka 中,每条消息都由一个键(key)和一个值(value)组成。键是一个可选的、用于标识消息的数据,而值则是实际的消息内容。在发送消息时,Kafka 生产者需要将键和值进行序列化,以便能够在网络上传输和存储到 Kafka 服务器。
  • spring.kafka.producer.value-serializer: 用于配置 Kafka 生产者发送消息中值(value)的序列化器类,用法同上。
  • spring.kafka.producer.acks:生产者发送消息的确认模式。可选的值有 “0”(不需要任何确认)、“1”(只需要 Leader 确认)和 “all”(需要 Leader 和所有副本确认)。
  • spring.kafka.producer.retries:配置生产者在发生错误时的重试次数。
  • spring.kafka.producer.retry-backoff-ms:配置重试之间的延迟时间(默认为 100 毫秒)。重试的间隔时间会随着重试次数的增加而指数级增长,以避免过度负载和大量的重复请求。
  • spring.kafka.producer.batch-size:配置每个批次中包含的消息大小。当应用程序使用 Kafka 生产者发送消息时,发送单个消息可能会带来一些性能开销。为了减少这种开销,可以将多个消息进行批量发送。这个 参数就是用来指定每个批次中包含的消息大小。
  • spring.kafka.producer.buffer-memory:用于配置 Kafka 生产者的缓冲区内存大小的属性,Kafka 生产者在发送消息时,不会立即将消息发送到服务器,而是先将消息缓存在生产者的缓冲区中。当缓冲区中的消息达到一定大小或达到一定时间限制时,生产者才会批量地将消息发送到 Kafka 服务器。该参数的单位是字节,默认值是 33554432 字节(32MB)。
  • spring.kafka.producer.client-id:配置生产者的客户端 ID,如果你没有显式地设置该属性,则 Kafka 生产者会自动生成一个随机的客户端 ID。使用自定义的客户端 ID 可以帮助你更好地追踪和监控不同的生产者实例
  • spring.kafka.producer.compression-type:指定生产者使用的消息压缩类型
    常见的压缩类型包括:
    none:表示不使用压缩,消息以原始的形式发送。
    gzip:表示使用 GZIP 压缩算法对消息内容进行压缩。
    snappy:表示使用 Snappy 压缩算法对消息内容进行压缩。
    lz4:表示使用 LZ4 压缩算法对消息内容进行压缩。
  • spring.kafka.producer.enable-idempotence:启用生产者的幂等性,确保消息的唯一性和顺序性。
    在消息系统中,幂等性是指多次执行同一个操作所产生的影响与执行一次操作的影响相同。而在 Kafka 中,启用幂等性可以确保生产者发送的消息具有幂等性特性,即无论发送多少次相同的消息,最终的影响都是一样的。
    启用幂等性可以提供以下好处:
    1、消息去重:当生产者发送重复的消息时,Kafka 会自动去重,保证只有一条消息被写入。
    2、顺序保证:Kafka 会确保相同键的消息按照发送顺序进行处理,保证消息的顺序性。
    3、提高可靠性:当发生网络故障或生产者重试时,启用幂等性可以确保消息不会被重复发送,避免出现重复消费的问题。
    需要注意的是,启用幂等性会对性能产生一定的影响,因为 Kafka 生产者会为每个分区维护序列号和重试缓冲区。因此,在性能和可靠性之间需要进行权衡,根据具体的业务需求来决定是否启用幂等性。
  • spring.kafka.producer.max-in-flight-requests-per-connection:指定在单个连接上允许的未完成请求的最大数目。

spring.kafka.consumer用于配置 Kafka 消费者相关属性。以下属性都可以在使用@KafkaListener注解时被属性覆盖。
如:

@KafkaListener(id = "order_consumer",topics = "order",groupId = "g_order_consumer_group",containerFactory = "batchKafkaListenerContainerFactory1",properties = {"max.poll.interval.ms:300000", "max-poll-records:5"})public void consume(ConsumerRecords<String, String> records) {// 做业务逻辑}
  • spring.kafka.consumer.group-id:指定消费者所属的消费组的唯一标识符。
    在 Kafka 中,每个消费者都必须加入一个消费组(Consumer Group)才能进行消息的消费。消费组的作用在于协调多个消费者对消息的处理,以实现负载均衡和容错机制。
    具体来说,spring.kafka.consumer.group-id 的作用包括以下几点:
    消费者协调:Kafka 会根据 group-id 将不同的消费者分配到不同的消费组中,不同的消费组之间相互独立。消费组内的消费者协调工作由 Kafka 服务器自动完成,确保消息在消费组内得到均匀地分发。
    负载均衡:当多个消费者加入同一个消费组时,Kafka 会自动对订阅的主题进行分区分配,以实现消费者之间的负载均衡。每个分区只会分配给消费组内的一个消费者进行处理,从而实现并行处理和提高整体的消息处理能力。
    容错机制:在消费组内,如果某个消费者出现故障或者新的消费者加入,Kafka 会自动重新平衡分区的分配,确保各个分区的消息能够被有效地消费。
    需要注意的是,同一个消费组内的消费者共享消费位移(offset),即每个分区的消息只会被消费组内的一个消费者处理。因此,同一个主题下的不同消费组是相互独立的,不会进行负载均衡和消费位移的共享。
  • spring.kafka.consumer.key-deserializer:指定键(key)的反序列化器。将从 Kafka 中读取的键字节流反序列化为对象。
  • spring.kafka.consumer.value-deserializer:指定值(value)的反序列化器。将从 Kafka 中读取的值字节流反序列化为对象。
  • spring.kafka.consumer.enable-auto-commit:指定是否开启自动提交消费位移(offset)的功能。设置为 true 则开启自动提交,设置为 false 则需要手动调用 Acknowledgment 接口的 acknowledge() 方法进行位移提交。
  • spring.kafka.consumer.auto-commit-interval:当开启自动提交时,指定自动提交的间隔时间(以毫秒为单位)。
  • spring.kafka.consumer.auto-offset-reset:指定当消费者加入一个新的消费组或者偏移量无效时的重置策略。常见的取值有 earliest(从最早的偏移量开始消费)和 latest(从最新的偏移量开始消费)。
    spring.kafka.consumer.auto-offset-reset 属性有以下几种取值:
    latest:表示从当前分区的最新位置开始消费,即只消费从启动之后生产的消息,不消费历史消息。
    earliest:表示从该分区的最早位置开始消费,即包含历史消息和当前的消息。
    none:表示如果没有找到先前的消费者偏移量,则抛出异常。
    需要注意的是,spring.kafka.consumer.auto-offset-reset 的默认值是 latest,如果不设置该属性,则新加入消费组的消费者将从该主题的最新位置开始消费。
  • spring.kafka.consumer.max-poll-records:指定每次拉取的最大记录数。用于控制每次消费者向服务器拉取数据的数量,默认为 500。

在 Spring 中是使用 Kafka 监听器来进行消息消费的,spring.kafka.listener用来配置监听器的相关配置

  • spring.kafka.listener.type: 类型,默认值为single(单条)表示单条消费,值为batch表示开启批量消费,。
  • spring.kafka.listener.concurrency:指定监听器容器中并发消费者的数量。默认值为 1。通过设置并发消费者的数量,可以实现多个消费者同时处理消息,提高消息处理的吞吐量。
  • spring.kafka.listener .autoStartup:指定容器是否在启动时自动启动。默认值为 true。可以通过设置为 false 来在应用程序启动后手动启动容器。
  • spring.kafka.listener.clientIdPrefix:指定用于创建消费者的客户端 ID 的前缀。默认值为 “spring”.
  • spring.kafka.listener .ackMode:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。
    spring.kafka.listener.ack-mode的取值有两个比较常见的选项值MANUAL 和MANUAL_IMMEDIATE。MANUAL表示处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。MANUAL_IMMEDIATE表示每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交。
  • spring.kafka.listener.ackCount:当ackMode为"COUNT”或者"COUNT_TIME"时,处理多少个消息后才进行消息确认。
  • spring.kafka.listener.missing-topics-fatal:配置当消费者订阅的主题不存在时的行为
    当将 spring.kafka.listener.missing-topics-fatal 设置为 true 时,如果消费者订阅的主题在 Kafka 中不存在,应用程序会立即失败并抛出异常,阻止消费者启动。这意味着应用程序必须依赖于确保所有订阅的主题都存在,否则应用程序将无法正常运行。
    当将 spring.kafka.listener.missing-topics-fatal 设置为 false 时,如果消费者订阅的主题在 Kafka 中不存在,应用程序将继续启动并等待主题出现。一旦主题出现,消费者将开始正常地消费消息。这种情况下,应用程序需要能够处理主题缺失的情况,并在主题出现后自动适应。
    默认情况下,spring.kafka.listener.missing-topics-fatal 属性的值为 false,这意味着如果消费者订阅的主题不存在,应用程序将会等待主题出现而不会立刻失败。
  • spring.kafka.listener.syncCommits:指定是否在关闭容器时同步提交偏移量。默认值为 false。可以通过设置为 true 来确保在关闭容器时同步提交偏移量。

生产者配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.somnus.config.kafka;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Resourceprivate KafkaProperties kafkaProperties;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProperties.getProducer().getBatchSize());props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getProperties().get("linger.ms"));props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getKeySerializer());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getValueSerializer());return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

消费端配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.tencent.qidian.ma.maaction.web.config.kafka;import jakarta.annotation.Resource;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;/*** KafkaBeanConfiguration*/
@Configuration
@EnableKafka
public class KafkaBeanConfiguration {@Resourceprivate ConsumerFactory consumerFactory;@Resourceprivate KafkaProperties kafkaProperties;@Bean(name = "kafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(kafkaProperties.getListener().getConcurrency());if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}// 此bean为了后续演示使用,参考消费演示中的containerFactory属性配置@Bean(name = "tenThreadsKafkaListenerContainerFactory1")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>tenThreadsKafkaListenerContainerFactory1() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(10);if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,kafkaProperties.getConsumer().getMaxPollRecords());propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit());propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get("session.timeout.ms"));propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer());propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer());return propsMap;}}

SpringBoot 集成 KafkaTemplate 发送Kafka消息

@Resourceprivate ObjectMapper mapper;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;try {Order order = new Order();String message = mapper.writeValueAsString(order);CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order", message);future.thenAccept(result -> {if (result.getRecordMetadata() != null) {log.debug("send message:{} with offset:{}", message, result.getRecordMetadata().offset());}}).exceptionally(exception -> {log.error("KafkaProducer send message failure,topic={},data={}", topic, message, exception);return null;});} catch (Exception e) {log.error("KafkaProducer send message exception,topic={},message={}", topic, message, e);}

SpringBoot 集成 @KafkaListener 消费Kafka消息

注意:@KafkaListener注解中的concurrency会覆盖消费者工厂中的concurrency,以下面代码为例,即使kafkaListenerContainerFactory1中的并发数是3,但是最终生成的监听器数量是2。换言之,@KafkaListener注解中的同属性参数会覆盖containerFactory或者默认的consumer中的配置。

@Resourceprivate ObjectMapper mapper;@KafkaListener(id = "order_consumer",topics = "order",groupId = "g_order_consumer_group",//可配置containerFactory参数,使用指定的containerFactory,不配置默认使用名称是kafkaListenerContainerFactory的bean//containerFactory = "kafkaListenerContainerFactory1",//concurrency = "2",properties = {"max.poll.interval.ms:300000", "max.poll.records:1"})// 可以只有ConsumerRecords<String, String> records参数。ack参数非必需,ack.acknowledge()是为了防消息丢失public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {String msg = record.value();log.info("Consume msg:{}", msg);try {Order order = mapper.readValue(val, Order.class);// 处理业务逻辑} catch (Exception e) {log.error("Consume failed, msg:{}", val, e);}}ack.acknowledge();}

max.poll.interval.ms

默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。

max.poll.records

1、max-poll-records是什么

max-poll-records是Kafka consumer的一个配置参数,表示consumer一次从Kafka broker中拉取的最大消息数目,默认值为500条。在Kafka中,一个消费者组可以有多个consumer实例,每个consumer实例负责消费一个或多个partition的消息,每个consumer实例一次从broker中可以拉取一个或多个消息。
max-poll-records参数的作用就是控制每次拉取消息的最大数目,以实现消费弱化和控制内存资源的需求。

2、max-poll-records解决的问题

避免一次性加载大量数据:

一次性拉取数量过大,会导致拉取消息时间过长,对broker和网络资源造成过度压力,同时consumer实例应用内存消耗过大,从而影响应用性能。如果要通过增加consumer实例数量或增加机器内存来解决该问题,则会增加成本;而通过控制每次拉取的消息数目,可以实现内存资源控制和应用性能优化。

更好地控制消息轮询的间隔时间:

当consumer实例消费消息的速度比broker生产消息的速度慢时,consumer会产生轮询时间间隔。如果轮询时间跨度过长,则会严重地延迟消息消费。而通过设置max-poll-records,可以控制consumer拉取消息的频率,进而控制消息消费的时间。

3、max-poll-records的最佳实践

max-poll-records的最佳实践共有下述三个核心思想:

3.1 根据机器内存和consumer实例数量调整参数

在设置max-poll-records参数时,应根据机器内存和实例数量来调整参数值,从而实现更好的性能和内存控制。如果消费数据量不大,可以设置较小的值,反之,如果消费数据量很大,则可以设置更大的值。

3.2 注意正确理解和使用max-poll-records

max-poll-records参数不是为了减少消息延迟而设置的,而是为了控制内存和消费弱化而设置的。在设置参数时应该明确这一点,从而更好地利用这个参数。

3.3 尽可能使用手动提交offset的方式

使用自动提交offset的方式,可能存在一些问题。如果一个消息批次在服务端已经被消费掉,但是由于客户端宕机或重启而没有及时提交offset,则可能导致消息重复消费的情况。因此, 建议在设置max-poll-records的同时,使用手动提交offset的方式。

concurrency

上述yml配置文件中 spring.kafka.listener.concurrency 的值为3,这个表示在代码中标记了@KafkaListener注解的方法处会启动三个消费者线程任务并发处理。 但是如果一个主题只有一个分区的话,消息只能被一个消费者组里面的一个消息者所消费,所以即使开了多个并发线程也没有用的。

一个消费者可以消费同一个topic的多个分区,但是一个分区不能被同一个组下的多个消费者消费。同一个组下有多个消费者并发消费同一个topic时,要注意设置的消费者并发个数一定要小于等于topic的分区数,不然会有空置的线程没有分区可以消费。

设置并发的时候根据分区数和消费者的个数来分配每个消费者消费几个分区,消费者可以消费一个或多个分区。例如两个分区的话,如果想增强消息的消费速度,在没有进行消费者服务的横向扩展时,可以考虑采用增加消费者的并发数量,将并发数量修改为2。

项目中总的消费者线程数量为: concurrency * 标记了@KafkaListener注解方法的数量(默认监听全部的partition)

  • 当总的消费者线程数 < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
  • 当总的消费者线程数 = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
  • 当总的消费者线程数 > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com