欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > kafka指北

kafka指北

2025/3/19 22:39:41 来源:https://blog.csdn.net/m0_73520938/article/details/146339315  浏览:    关键词:kafka指北

为自己总结一下kafka指北,会持续更新。创作不易,转载请注明出处。

目录

  • 集群
    • controller选举过程
    • broker启动流程
  • 主题创建
    • 副本分布
    • ISR
    • leader副本选举机制
    • LEO
  • 生产数据
    • 流程
      • 同步发送和异步发送
    • 分区策略
    • ack应答
    • 生产者发送消息的幂等性
    • 跨分区幂等性问题!!??
    • 事务
  • 数据存储
    • log、index、timeindex文件
    • 数据同步和HW
  • 消费数据
    • 事务隔离级别
    • 调度(协调)器Coordinator
    • 消费者消费分区分配策略
    • 消费者组内leader选举
    • rebalance机制
      • **消费者端端重平衡流程**
      • Broker端重平衡
  • 扩展

集群

kafka集群中每个broker的id是唯一的,在server.properties中配置。并且kafka集群架构也是用的常见的主从(Master-slave)架构,这个mater节点就成为controller节点,集群中只能有一个controller,controller用来管理集群中的broker、topic、partation。

controller选举过程

zookeeper前置知识:

  • 和zookeeper连接的客户端在zookeeper建立的节点分为临时节点和持久化节点。临时节点:这种节点的生命周期与创建它的客户端会话绑定。当客户端会话结束时(例如客户端断开连接),临时节点会自动被删除。持久化节点:这种节点一旦创建,就会一直存在于zookeeper中,除非显式地删除它。
  • 节点的唯一性。zookeeper中的节点路径是唯一的,这意味着在同一个父节点下,不能有两个同名的子节点。当多个客户端尝试创建同一个节点时,只有一个客户端会成功,其他客户端会收到创建失败的响应。
  • 监听器(Watcher)。zookeeper允许客户端在节点上设置监听器,用于监听节点的状态变化。当节点的状态发生变化时(例如节点被创建、删除、数据更新等),zookeeper会通知所有监听该节点的客户端,触发相应的回调函数。并且监听器是一次性的,即一旦触发后,客户端需要重新注册监听器才能继续监听节点的变化。

根据zookeeper这三个特性,controller选举过程就是这样的:

当kafka集群首次启动后,每个broker都会向zookeeper请求注册/controller节点,并且注册是临时节点的(在pettyZoo-1.9.7可视化工具中就是黄色的节点),但zookeeper节点是唯一,所以只会有一个节点成为controller,可以理解为先来后到原则。其它没注册成功的就会监听/controller节点。当controller发生故障时,其它broker就会监听到临时节点消失了,就竞争再次创建/controller临时节点,成为controller。

(从这也能看出kafka节点的管理都要依赖第三方zookeeper,和zookeeper有强耦合性,制约了kafka的发展,甚至成为kafka的性能瓶颈,所以kafka在后续的版本中尝试加入了一些节点间协调算法来代替zookeeper的作用,目标是彻底替代)

broker启动流程

根据上面controller的选举过程,则broker的启动流程大概如下:

第一个broker启动流程:1.注册broker节点,在/broker/ids下创建一个节点。2.监听/controller节点。3.发现还没人创建/controller,自己注册创建/controller节点,成为controller。4.监听/broker/ids下的节点,用来感知到后续broker的加入和退出。
在这里插入图片描述

第二个broker启动:1.注册broker节点。2.监听/controller节点3.注册/controller,但不成功。4.由于controller节点监听了/brokers/ids,所以这时zookeeper会通知controller集群的变化。5.controller连接其它broker,发送集群相关的元信息
在这里插入图片描述
controller节点故障被删除的情况:1.zookeeper通知其它broker节点controller节点的删除。2.其它broker监听注册controller节点,但只有一个节点能注册成功。3.controller节点在/brokers/ids上增加监听器。4.controller连接其它所有broker,发送集群相关元信息。
在这里插入图片描述

主题创建

查看topic详情的命令:

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092--describe--topic topicname

kafka默认开启了主题自动创建,当生产者向一个不存在的主题发送了消息,会自动创建该主题,默认一个分区,一个leader副本。

副本分布

在创建主题时,每个副本放在哪个broker节点上?

如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有指定replica-assignment参数,那么就按照Kafka内部逻辑来分配,内部逻辑按照机架信息分为两种策略:【未指定机架信息】和【指定机架信息】

未指定机架信息:
在这里插入图片描述
用一个例子代入:

假设:当前分区编号 : 0;BrokerList:[1,2,3,4];副本数量:4 ;随机值: 2;副本分配间隔随机值: 2第一个副本(也就是leader副本)的索引为:(分区编号 + 随机值)% BrokerID列表长度 =(0 + 2)% 4 = 2
索引为2的brokerID为3,则第一个副本所在BrokerID : 3第二个副本索引(第一个副本索引 + (1 +(副本分配间隔 + 0)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1+(2+0)%3))% 4 = 1则第二个副本所在BrokerID:2第三个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 1)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1+(2+1)%3))% 4 = 3
则第三个副本所在BrokerID:4第四个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 2)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1+(2+2)%3))% 4 = 0
则第四个副本所在BrokerID:1最终0号分区的副本所在的Broker节点列表为【3,2,4,1】
其他分区采用同样算法

最后得到的【3,2,4,1】列表也就是ISR。因为算法中用到了几个随时值,所以每次计算出来的结果不一定是一样的。

副本分布最理想的情况是各个leader和follower副本均匀的分布在各个broker节点上,这样IO读写可以均衡的散落到各个服务器上,避免单点读写瓶颈。但kafka是一个一个创建主题的,人家又不知道你以后会创建什么样的主题,它没办法去站在上帝视角去均匀的分配副本,所以默认的分配策略分配的并不是最理想的,我们可以自己指定副本分布在哪个分区上
在这里插入图片描述
3号节点指的是server.properties配置文件中broker.id为3的节点

另外需要说明的是,将leader副本和follower副本分布在一个节点上没有任何意义,follower副本存在的意义就是当leader宕掉了,follower中选一个重新作为leader,提供一种容错机制。但是分布在了一个节点上,不就一损俱损了吗。

ISR

ISR(In-Sync Replicas)副本同步列表。一个分区有leader和follower副本,leader副本负责读写消息,follower负责从leader中同步消息。保持着数据同步的这些副本,就称这些副本在一个副本同步列表中。ISR中存储的都是副本所在的节点的brokerid。例如就像这样[1,3,2,4]

在没有网络延迟等任何问题,一个最理想的情况下,一个分区的ISR包括这个分区所有副本所在的brokerid。但当某个follower副本的最后一个消息的偏移量落后于leader最后一个消息的偏移量超过一个阈值时,leader将从ISR中删除该follower。

具体来说,是由这个参数决定的replica.lag.time.max.ms
在这里插入图片描述
还有一个影响参数是replica.lag.max.messages,但在新版kafka中已经被删除了
在这里插入图片描述

leader副本选举机制

在创建主题时,就会计算出ISR列表顺序(计算逻辑就是前面副本分布中说明的),ISR列表中第一个broker上的副本就是leader副本。

当leader副本所在的节点宕掉时,就会直接将ISR列表中后一个节点上的副本作为leader副本,如果原leader副本重新加入了ISR,也是加在ISR的最后面。比如,原ISR:[1,3,2,4],1节点所在的副本是leader副本,1节点宕掉了,就会直接将1后面的3节点上的副本作为leader副本。如果然后1又回来了,直接加在ISR的最后面,ISR变成了:[3,2,4,1]。

但是如果当leader宕掉后,ISR为空了(例如所有副本均被移出 ISR),Kafka 的行为取决于配置参数 unclean.leader.election.enable

unclean.leader.election.enable=false(默认值)禁止从非 ISR 副本选举,分区将不可用,生产者写入会抛出 NoLeaderForPartitionException。这是为了 严格保证数据一致性,避免数据丢失。

unclean.leader.election.enable=true允许从 AR(Assigned Replicas,所有副本)中选举新 Leader,即使该副本不在 ISR 中。但是这样有风险:新 Leader 可能丢失部分未同步的消息,导致数据不一致。

整个选举过程是由controller节点负责完成的。

LEO

LEO:日志末端偏移量 (Log End Offset),记录某副本消息日志(log)中下一条消息的偏移量,也就是下一个消息写入的偏移量。注意是下一条消息,也就是说,如果LEO=10,那么表示该副本只保存了偏移量值是[0, 9]的10条消息;

LEO是副本上的概念,不要搞混了

生产数据

流程

在这里插入图片描述

主线程:准备数据->拦截器->得到集群元数据->序列化器->分区器->数据校验->将数据追加到数据收集器(RecordAccumulator)中。

数据收集器也是一个缓冲区,并且数据是一批一批的被sender线程发送的,一批大小默认为16k(但不一定就是16k,只是超过了16k就不再放数据了,有可能原来有15k,又放了一个数据变成了20k,这时就不再放数据了),以主题为组。

生产者主线程也就是我们的用户线程至此就结束了。

sender发送线程:从RecordAccumulator取一批次数据,按broker分组,每个请求对应一个Broker,包含多个分区的消息批次->构建成ProduceRequest->通过NetworkClient发送请求到broker,与broker网络IO通信(内部使用Java的NIO)->回调函数处理响应
在这里插入图片描述
所以真正将数据发送到kafka的是sender线程,并不是我们的用户主线程
整个发送消息的过程也是一个生产者消费者模型

同步发送和异步发送

// 异步发送:主线程不阻塞
producer.send(new ProducerRecord<>("topic", "key", "value"));producer.send(new ProducerRecord<>("topic", "key", "value"), (metadata, exception) -> {if (exception != null) {log.error("发送失败", exception);} else {log.info("消息已发送到分区 {}", metadata.partition());}
});

kafka发送消息默认都是异步的,上面两种不管有没有设置回调都是异步发送的。只有对send返回的Future阻塞获取才是同步发送。

// 同步发送:主线程阻塞等待发送结果
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic", "key", "value"));
RecordMetadata metadata = future.get(); // 阻塞直到收到 Broker 响应

另外需要注意的,该回调是对sender线程的回调。也就是主线程在将消息放到RecordAccumulator缓冲区中就立即返回了

分区策略

生产者发送消费的分区策略决定了生产者要将该消息发送到哪个分区。分区编号从0开始

计算分区的逻辑大致如图:
在这里插入图片描述
从图中代码可以看出四个分区策略的优先级为:是否指定分区>是否指定分区器>是否定义了消息的key>随机

也就是,当指定分区时发送到指定的分区(指定partition参数)
在这里插入图片描述
没指定具体分区,也没指定分区器,但消息有key时根据key来计算发送的分区,对应DefaultPartitioner分区器类
在这里插入图片描述
DefaultPartitioner的分区逻辑大概是:计算key的序列化字节数组的hashcode,然后hashcode对分区数取模

未具体指定分区,消息也没有key,也没有指定生产者的分区器时,会用一个随机数对可用分区取模,对应的是UniformStickyPartitioner分区器类
在这里插入图片描述

除了DefaultPartitioner和UniformStickyPartitioner,还有一个RoundRobinPartitioner分区器。
在这里插入图片描述
(依赖下载了两个版本,所以会有两份,不用关心这个)

我们可以自己传递生产者配置指定用该分区器

用法:需要自己定义KafkaProducer,定义partitioner.class参数

 @Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(defaultKafkaProducerFactory);}

除了以上三个自带的,我们还可以定义一个类,实现Partitioner接口,从而实现自己的分区逻辑。

ack应答

生产者发送一条消息,broker会给生产者一个应答,Kafka提供了3种应答处理

ACK = 0

当生产数据时,生产者对象(sender线程)将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答)。生产者不会等待任何确认。

ACK=1

生产者会等待 Leader 副本成功写入消息后返回确认。

ACK=all(或 ack=-1)kafka3开始的默认值

生产者会等待所有同步副本(ISR)都成功写入消息后返回确认。

生产者发送消息的幂等性

当Producer的acks设置成1或-1时,Producer每次发送消息都是需要获取Broker端返回的RecordMetadatal的。这个过程中就需要两次跨网络请求。当网络出现延迟等原因时,生产者就会重试发送没有接受到应答的消息。(默认无限次重试,int的最大值),这就会导致消息重复发送,kafka对该幂等性问题做了设计。

首先需要理解数据传递过程中的三个数据语义:at-least-once:至少一次;at-most-once:最多一次;exactly-once:精确一次。

通常意义上,at-least-once可以保证数据不丢失,但是不能保证数据不重复。而at-most-once保证数据不重复,但是又不能保证数据不丢失。这两种语义虽然都有缺陷,实现起来相对来说比较简单。但是对一些敏感的业务数据,往往要求数据即不重复也不丢失,这就需要支持Exactly-once语义。而要支持Exactly-once语义,需要有非常精密的设计。
回到Producer发消息给Broker这个场景,如果要保证at-most-once语义,可以将ack级别设置为0即可,此时,是不存在幂等性问题的。如果要保证at-least-once语义,就需要将ack级别设置为1或者-1,这样就能保证Leader Partition中的消息至少是写成功了一次的,但是不保证只写了一次。如果要支持Exactly-once语义怎么办呢?这就需要使用到幂等性(idempotence)属性了。
Kafka为了保证消息发送的Exactly-once语义,增加了几个概念:

  • PID:唯一标识一个生产者实例。由Broker在生产者初始化时分配重启生产者会重新分配
  • Sequence Numer:生产者为每个分区维护一个单调递增的序列号。标识生产者发送的每条消息的顺序。
  • Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当发来的消费的SequenceNumber=SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则, SequenceNumber过小就认为消息已经写入了,不需要再重复写入。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个 OutOfOrderSequenceException。

这样,Kafka在打开幂等性(idempotence)控制后,在Broker端就会保证每条消息在一次发送过程中,Broker端最多只会刚刚好持久化一条。这样就能保证at-most-once语义。再加上之前分析的将生产者的acks参数设置成1或-1,保证at-least-once语义,这样就整体上保证了Exactaly-once语义。

开始幂等性需要生产者做以下配置

在这里插入图片描述

翻译过来也就是

配置项配置值说明
enable.idempotencetrue开启幂等性,默认值
max.in.flight.requests.per.connection小于等于5每个连接的在途请求数,不能大于5,取值范围为[1,5],默认是5
acksall(-1)确认应答,不能修改,默认是-1
retries>0重试次数,推荐使用Int最大值,默认是0

但是kafka无法保证生产者重启前后的幂等性,因为生产者重启后PID会改变,无法做到跨生产者会话幂等性。

想要解决跨会话幂等性问题就要用到kafka的事务了。

跨分区幂等性问题!!??

外面看到很多文章都说kafka的幂等性机制无法保障跨分区的幂等性。我想说一下我的理解:

我想问,什么情况下会产生多分区幂等性?kafka发送消费的重试机制,也只是将消息发送到原来计算好的分区,又不是重新计算分区?怎么会产生消息重复存储问题的?或者这样一种情况:一个生产者向不同分区发送了两条相同业务含义的消息,又根据某个分区策略发送到了不同的分区,从而存储了两份相同的消息?我只想说,这对于kafka来讲本来就是两个消息,本来就要当两个消息来存储,这不是kafka的问题,这是开发者(你)编写的生产者发送消息的代码的问题,何谈幂等性问题?

想要真正解决幂等性问题,只有我们开发者自己解决,比如通过业务主键、流水表等

事务

生产者用事务发消息可以保证一个生产者的PID重启后不变,也就解决了生产者跨会话的幂等失效问题。在事务中发送的消息可以保证要么全部成功要么全部失败。

事务提交流程

Kafka中的事务是分布式事务,所以采用的也是二阶段提交

第一个阶段提交事务协调器会告诉生产者事务已经提交了,所以也称之预提交操作,事务协调器会修改事务为预提交状态

在这里插入图片描述

第二个阶段提交事务协调器会向分区Leader节点中发送数据标记,通知Broker事务已经提交,然后事务协调器会修改事务为完成提交状态
在这里插入图片描述

特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个Partition发送marker信息,此操作会无限重试,直至成功。

但是不同的Broker可能无法全部同时接收到marker信息,此时有的Broker上的数据还是无法访问,这也是正常的,因为kafka的事务不能保证强一致性,只能保证最终数据的一致性,无法保证中间的数据是一致的。不过对于常规的场景这里已经够用了,事务协调器会不遗余力的重试,直至成功。

代码演示

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;public class ProducerTransactionTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 配置幂等性configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// TODO 配置事务IDconfigMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");// TODO 配置事务超时时间configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5);// TODO 创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 初始化事务producer.initTransactions();try {// TODO 启动事务producer.beginTransaction();// TODO 生产数据for ( int i = 0; i < 10; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record);}// TODO 提交事务producer.commitTransaction();} catch ( Exception e ) {e.printStackTrace();// TODO 终止事务producer.abortTransaction();//一旦正常调用commitTransaction后,abortTransaction调用会报错}// TODO 关闭生产者对象producer.close();}

开启事务的前提是已经开启了幂等性配置,因为事务就是用来解决原本的幂等保证机制遗留的问题的

调用producer.commitTransaction()才会真正将消息发送到broker。需要强调的是,一旦正常调用了commitTransaction,消息就能被消费者消费了(即使消费者的隔离级别是读未提交),再调用abortTransaction也没用了(会报错)

数据存储

一条消息的内容有这些(ProducerRecord)

在这里插入图片描述

log、index、timeindex文件

kafka的topic的每个分区都对应一个物理文件夹,比如创建一个名为test的topic,分区数量为3,副本数量为1。则在物理磁盘上像这样:
在这里插入图片描述

从这也能看出,kafka中的topic只是逻辑上的概念,分区才是真正的物理存储结构。

每个文件夹里一般有这些内容:
在这里插入图片描述
log文件是消息存储的文件;index文件是索引文件,作用是记录消息的偏移量和消息在log文件中的具体位置的关联,这样就能根据index文件和消息偏移量在log文件中找到消息了;timeindex是时间索引文件,作用是根据时间戳去找log文件中找消息。

消息就存储在log文件中。但是消息发送后并不会立即写到文件里,而是由一个logManger组件周期性的将消息从内存写到磁盘上。具体什么时候刷盘有配置文件决定:

在这里插入图片描述

kafka官方认为消息的可靠性应该主要靠分区副本来保证,而不是立即将消息刷写到磁盘上。

并且log文件会进行拆分,由多个文件段组成,并不是将所有消息都存储到一个log文件中。具体拆分条件由配置决定,比如按照文件大小拆分:

在这里插入图片描述

多个文件段就像这样:

在这里插入图片描述

三个文件是一组,叫一个文件段。并且文件名数字(20位)就是这组文件的起始偏移量,比如上图第一组的文件的起始偏移量是0,第二组的是16。从这里也能知道第一组文件存储了16条消息(0~15)

想要查看log文件里的内容可以用kafka-dump-log.sh

kafka-dump-log.sh --files /mysoftware/kafka_2.13-3.8.0/data/kafka/test-tran-0/00000000000000000000.log

index文件内容一般像这样:

在这里插入图片描述

log文件中又有每个消息的postition,所以就能根据index文件和偏移量在log文件中找到某条消息。

但是,index文件不会把每条消息都写入,而是达到一定大小阈值才写入一条(默认大小是4k),所以又叫稀疏索引文件。那既然index文件里的内容和log文件里的内容不是一一对应的,假如要查找的消息在index文件中没有记录索引,怎么办?那就只能在某个log文件中从头开始找了。因为kafka中大部分情况都是根据偏移量顺序读取消息的。

timeindex文件和index类似,它放的是时间戳和位置的对应信息

在这里插入图片描述

数据同步和HW

HW:(High Watermark),即高水位值,它代表一个偏移量信息。
在这里插入图片描述

多个follower副本在从leader副本同步数据时,各个follower同步的数据不一定一致。在leader挂了时,会从ISR列表(ISR列表是有顺序的)中找到下一个follower(上图中就是follower-1)成为leader,但是follower-1只有2条数据,原leader有4条数据呢,这怎么办?

kafka有水位线的概念,消费者能消费到哪个数据取决于消息数最少的(木桶效应)

在这里插入图片描述

水位线以上都是不可见的。水位线也会随着follower同步数据而不断上涨

消费数据

当不设置auto.offset.reset,默认是从LEO(Log End Offset,LEO=消息条数+1)开始消费消息的,在消费者开启之前的消息都消费不到。

偏移量默认是自动提交的,提交时间间隔默认是5秒。

在这里插入图片描述

自动提交有可能导致重复消费。设置为手动提交。手动提交有同步和异步提交consumer.commitAsync();consumer.commitSync();

__consumer_offsets-xx内置主题

当消费者组内的消费者数量大于分区数量时,就会有消费者空闲,某个消费者宕掉了,这个空闲的消费者就会顶替上去,但是新顶替上来的消费者要从哪开始消费,它自己是不知道的,就要用什么东西来记录消费者消费到哪了,这个东西就是__consumer_offsets-xx主题。该主题是kafka的内置主题,默认有50个分区,编号0~49,并且可配置。消费者提交偏移量就会记录到这个主题内,具体记录到哪个分区上?:“groupid”.hashcode%分区数量。也就是默认是用消费者组的名字的hashcode对50取模计算,该分区位于哪个broker就由哪个broker负责记录更新消费者的消费偏移量。

事务隔离级别

kafka并没有提供消费者事务。如果数据处理完毕,提交偏移量失败,重新拉取消息时可能导致重复消费,这要自己通过其它方式解决。

这里要说的是和生产者事务有关的消费者的事务隔离级别。

在这里插入图片描述

只有两个取值read_committed和read_uncommitted。读已提交表示只有生产者事务正确提交后消费者才能看到数据,而读未提交,默认是读未提交!!!

需要强调的是,一旦正常调用了commitTransaction,消息就能被消费者消费了(即使是读未提交),再调用abortTransaction也没用了(会报错),例如下面这样

在这里插入图片描述
在这里插入图片描述

调度(协调)器Coordinator

消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(协调)(Group Coordinator)

Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator

消费者组选择Coordinator节点:groupid的hashcode%50(50是_consumser_offsets主题的分区数量),得到的分区leader副本在哪个broker节点上,就选择哪个broker上的Coordinator

消费者消费分区分配策略

基本特性:一个消费者组中的消费者可以消费不同的topic;一个分区只能由一个消费者消费,但一个消费者可以消费多个分区。

消费者想要拉取主题分区的数据,首先必须要加入到一个组中。但是一个组中有多个消费者的话,那么每一个消费者该消费哪个分区呢?这是由分区分配策略决定的。具体是由消费者的Leader决定的,这个Leader我们称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者我们称之为Follower。消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。

每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。leader从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区,leader确定了分配关系后再上报给Coordinator

几种分区分配策略:

RoundRobinAssignor(轮询分配策略)

每个消费者组中的消费者都会含有一个自动生产的UUID作为memberid。将每个消费者按照memberid进行排序,所有member消费的主题分区根据主题名称进行排序。
在这里插入图片描述

将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过。

在这里插入图片描述

轮询分配分配的不是很均衡。

RangeAssignor(范围分配策略)

基本原则:按照订阅的每个(注意,这里用词是“每个”,也就是说是一个主题一个主题算的)topic的partition数计算出每个消费者应该分配的分区数量,然后分配,一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐。

按顺序向前补齐解释:

假设【1,2,3,4,5】5个分区分给2个消费者:5 / 2 = 2, 5 % 2 = 1 => 剩余的一个补在第一个中[2+1][2] => 结果为[1,2,3][4,5]

假设【1,2,3,4,5】5个分区分到3个消费者:

5 / 3 = 1, 5 % 3 = 2 => 剩余的两个补在第一个和第二个中[1+1][1+1][1] => 结果为[1,2][3,4][5]

因为是一个主题一个主题算的,Range分配策略针对单个Topic的情况下显得比较均衡,但是假如Topic多的话, member排序靠前的可能会比member排序靠后的负载多很多。是不是也不够理想。例如:

在这里插入图片描述

左边的分配方式解释:因为是一个主题一个主题算的,所以主题一中,紫色消费者分配三个分区,主题二中,紫色消费者也是分配三个主题。这样紫色就消费6个分区,黄色才消费4个分区。

再看右边,分配的更不合理。

StickyAssignor(粘性分区)

在第一次分配后,每个组成员都保留分配给自己的分区信息。当发生重平衡时,在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。

在这里插入图片描述
在这里插入图片描述

从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些。

CooperativeStickyAssignor

前面的三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议。COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。

Kafka消费者默认的分区分配就是RangeAssignor,CooperativeStickyAssignor。首次使用范围分配,后面使用优化后的粘性分区策略。

消费者组内leader选举

消费者leader的选举由Group Coordinator来完成,可以看作是先到先得原则,但有个细节:当有一个新的消费者加入组后(也就是发生了重平衡),会将原来所有消费者都踢出组,然后全部消费者重新竞争leader

rebalance机制

(一部分内容参考了极客时间里kafka的课程)

当消费者组消费的分区关系发生变化,例如有消费者加入或退出、订阅的主题数量发生变化、主题分区增多了,就会发生组内重平衡,重新分配每个消费者该消费哪个分区。

Kafka的心跳机制 与 Rebalance

Kafka的心跳机制 与 Rebalance 有什么关系呢? 事实上,重平衡过程是靠消费者端的心跳线程(Heartbeat Thread)通知到其他消费者实例的 每当消费者向其 coordinator 汇报心跳的时候, 如果这个时候 coordinator 决定开启 Rebalance , 那么 coordinator 会将REBALANCE_IN_PROGRESS封装到心跳的响应中, 当消费者接受到这个REBALANCE_IN_PROGRESS, 他就知道需要开启新的一轮 Rebalance 了, 所以heartbeat.interval.ms除了是设置心跳的间隔时间, 其实也意味着 Rebalance 感知速度, 心跳越快,那么 Rebalance 就能更快的被各个消费者感知。

在 Kafka 0.10.1.0 版本之前, 发送心跳请求是在消费者主线程完成的, 也就是你写代码调用KafkaConsumer.poll方法的那个线程。 这样做有诸多弊病,最大的问题在于,消息处理逻辑也是在这个线程中完成的。 因此,一旦消息处理消耗了过长的时间, 心跳请求将无法及时发到协调者那里, 导致协调者“错误地”认为该消费者已“死”。 自 0.10.1.0 版本开始, 引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。

consumer实例的五种状态:

状态描述
Empty组内没有任何成员,但是消费者可能存在已提交的位移数据,而且这些位移尚未过期
Dead同样是组内没有任何成员,但是组的元数据信息已经被协调者端移除,协调者保存着当前向他注册过的所有组信息
PreparingRebalance消费者组准备开启重平衡,此时所有成员都需要重新加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员中等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内成员能够正常消费数据

状态机:

img

状态流转说明:

一个消费者组最开始是Empty状态, 当重平衡过程开启后, 它会被置于PreparingRebalance状态 等待成员加入, 成员都加入之后变更到CompletingRebalance状态等待分配方案, 当coordinator分配完个消费者消费的分区后, 最后就流转到Stable状态完成重平衡。 当有新成员加入或已有成员退出时, 消费者组的状态 从Stable直接跳到PreparingRebalance状态, 此时,所有现存成员就必须重新申请加入组。 当所有成员都退出组后,消费者组状态变更为Empty。

Kafka定期自动删除过期位移的条件就是,组要处于Empty状态。 因此,如果你的消费者组停掉了很长时间(超过7天), 那么Kafka很可能就把该组的位移数据删除了。

消费者端端重平衡流程

在消费者端,重平衡分为两个步骤:

  1. 加入组。 当组内成员加入组时,它会向 coordinator 发送JoinGroup请求。 在该请求中,每个成员都要将自己订阅的主题上报, 这样协调者就能收集到所有成员的订阅信息。 一旦收集了全部成员的JoinGroup请求后, Coordinator 会从这些成员中选择第一个发送JoinGroup请求的成员成为领导者。 领导者消费者的任务是收集所有成员的订阅信息, 然后根据这些信息,制定具体的分区消费分配方案。 选出leader之后, Coordinator 会把消费者组订阅信息封装进JoinGroup请求的 响应中,然后发给领导者,由领导者统一做出分配方案后, 进入到下一步:发送SyncGroup请求。
  2. 领导者向 Coordinator 发送SyncGroup请求, 将刚刚做出的分配方案发给协调者。其他成员也会向 Coordinator发送SyncGroup请求,只不过请求体中并没有实际的内容。 这一步的主要目的是让 Coordinator 接收分配方案, 然后统一以 SyncGroup 响应的方式分发给所有成员, 这样组内所有成员就都知道自己该消费哪些分区了。

Broker端重平衡

要剖析协调者端处理重平衡的全流程, 我们必须要分几个场景来讨论。 这几个场景分别是

  • 新成员加入组
  • 组成员主动离组
  • 组成员崩溃离组
  • 组成员提交位移。

接下来,我们一个一个来讨论。

  • 新成员入组。 新成员入组是指组处于Stable状态后,有新成员加入。 如果是全新启动一个消费者组,Kafka是有一些自己的小优化的,流程上会有些许的不同。 我们这里讨论的是,组稳定了之后有新成员加入的情形。 当协调者收到新的JoinGroup请求后, 它会通过心跳请求响应的方式通知组内现有的所有成员, 强制它们开启新一轮的重平衡。 具体的过程和之前的客户端重平衡流程是一样的。 现在,我用一张时序图来说明协调者一端是如何处理新成员入组的。

img

  • 组成员主动离组。 何谓主动离组?就是指消费者实例所在线程或进程调用close()方法主动通知协调者它要退出。 这个场景就涉及到了第三类请求:LeaveGroup请求。 协调者收到LeaveGroup请求后,依然会以心跳响应的方式通知其他成员

img

  • 组成员崩溃离组。 崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。 它和主动离组是有区别的, 因为后者是主动发起的离组,协调者能马上感知并处理。 但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到, 这段时间一般是由消费者端参数session.timeout.ms控制的。 也就是说,Kafka一般不会超过session.timeout.ms就能感知到这个崩溃。 当然,后面处理崩溃离组的流程与之前是一样的,我们来看看下面这张图。

img

重平衡时协调者对组内成员提交位移的处理

正常情况下,每个组内成员都会定期汇报位移给协调者。 当重平衡开启时,协调者会给予成员一段缓冲时间, 要求每个成员必须在这段时间内快速地上报自己的位移信息, 然后再开启正常的JoinGroup/SyncGroup请求发送。 还是老办法,我们使用一张图来说明。

img

扩展

零拷贝

在这里插入图片描述

顺序消费

在这里插入图片描述

保证同一批因果依赖的消息分到一个分区就可以。这句话很精辟!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词