欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 文化 > kafka消费端之消费者协调器和组协调器

kafka消费端之消费者协调器和组协调器

2025/2/9 3:10:04 来源:https://blog.csdn.net/SO_zxn/article/details/145496953  浏览:    关键词:kafka消费端之消费者协调器和组协调器

文章目录

  • 概述
  • 回顾历史
    • 老版本获取消费者变更
    • 老版本存在的问题
  • 消费者协调器和组协调器
    • 新版如何解决老版本问题
    • 再均衡过程
      • **第一阶段CFIND COORDINATOR**
      • **第二阶段(JOINGROUP)**
        • 选举消费组的lcader
        • 选举分区分配策略
      • 第三阶段(SYNC GROUP)
        • 消费组元数据信息
      • 第四阶段(HEARTBEAT)
  • 消费者协调器和控制器关系
    • 分区分配与消费者组管理
    • 选举与状态同步
    • 心跳检测与故障处理

概述

上一章我们讲解kafka消费端的分区分配策略时留下了两个问题,这章我们通过消费者协调器和组协调器继续详细回答那两个问题。其中我们会回归历史说明消费者协调器和组协调器出现的原因,最后会说下消费者协调器和控制器在工作上有何关联。

回顾历史

老版本获取消费者变更

消费者协调器和组协调器的概念是针对新版的消费者客户端而言的,Kafka建立之初并没有它们。旧版的消费者客户端是使用ZooKeeper的监听器(Watcher)来实现这些功能的。每个消费组王()在ZooKeeper中都维护了一个/consumers//ids路径,在此路径下使用临时节点记录录属于此消费组的消费者的唯一标识CconsumerIdString )consumerIdString由消费者启动时创建。消费者的唯一标识由aconsumer.id+主机名+时间截+UUID的部分信息构成,其中consumerid是旧版消费者客户端中的配置,相当于新版客户端中的client.id。

每个broker、主题和分区在ZooKeeper中也都对应一个路径:/brokers/ids/记录了 host、port及分配在此broker上的主题分区表;/brokers/topics/记录了每个分区的leader副本、ISR集合等信息。/brokers/topics//partitions//state记录了当前leader副本、leaderepoch等信息。如下图:
在这里插入图片描述

每个消费者在启动时都会在/consumers//ids和/brokers/ids路径上注册一个监听器。当/consumers//ids路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当/brokers/ids路径下的子节点发生变化时,表示broker出现了增减。这样通过ZooKeeper所提供的Watcher,每个消费者就可以监听消费组和Kafka集群的状态了。

老版本存在的问题

这种方式下每个消费者对ZooKeeper E的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致Kafka工作在一个不正确的状态。与此同时,这种严重依赖于ZooKeeper集群的做法还有两个比较严重的问题。

  • (1)羊群效应(HerdEffect):所谓的羊群效应是指ZooKeeper中一个被监听的节点变化大量的Watcher通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
  • (2)脑裂问题(SplitBrain):消费者进行再均衡操作时每个消费者都与ZooKeeper进行通信以判断消费者或broker变化的情况,由于ZooKeeper本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。

消费者协调器和组协调器

新版如何解决老版本问题

新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互,也就是发送心跳,加入group请求、提交位移等请求。

再均衡过程

ConsumerCoordinatorr与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:

  • 有新的消费者加入消费组。
  • 有消费者容机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延退导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest请求)。比如客户端调用了
    unsubscribleO方法取消对某些主题的订阅。
  • 消费组所对应的GroupCoorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。

第一阶段CFIND COORDINATOR

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的(GroupCoordinator节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。

Kafka 在收到 FindCoordinatorRequest请求之后,会根据coordinator_key(也就是groupId)查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回其相对应的nodeid、host和port信息。具体查找GroupCoordinator的方式是先根据消费组groupId的哈希值计算_consumer_offsets中的分区编号。找到对应的_consumer_offsets中的分区之后,再寻找此分区leader副本所在的broker节点,该broker节点即为这个groupId所对应的GroupCoordinator节点。消费者groupId最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区leader副本所在的broker节点,让此broker节点既扮演GroupCoordinator的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOINGROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

JoinGroupRequest中的group_protocols域为数组类型,其中可以囊括多个分区分配策略,这个主要取决于消费者客户端参数partition.assignment.strategy的配置。如果配置了多种策略,那么JoinGroupRequest中就会包含多个protocol name和protocol metadata。

如果是原有的消费者重新加入消费组,那么在真正发送JoinGroupRequest请求之前还要执行一些准备工作:

  • (1)如果消费端参数enable.auto.commit设置为tue(默认值也为tue),即开启自动提交位移功能,那么在请求加入消费组之前需要向GroupCoordinator提交消费位移。这个过程是阻塞执行的,要么成功提交消费位移,要么超时。

  • (2)如果消费者添加了自定义的再均衡监听器(ConsumerRebalanceListener),那么此时会调用onPartitionsRevokedO方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些状态,或者提交消费位移等。

  • (3)因为是重新加入消费组,之前与GroupCoordinator节点之间的心跳检测也就不需要了,所以在成功地重新加入消费组之前需要禁止心跳检测的运作。

消费者在发送JoinGroupRequest请求之后会阻塞等待Kafka服务端的响应。服务端在收到JoinGroupRequest请求后会交由GroupCoordinator来进行处理。GroupCoordinator首先会对JoinGroupRequest请求做合法性校验,比如group_id是否为空、当前broker节点是否是请求的消费者组所对应的组协调器、rebalance_timeout的值是否在合理的范围之内。如果消费者是第一次请求加入消费组,那么JoinGroupRequest请求中的memberid值为null,即没有它自身的唯一标志,此时组协调器负责为此消费者生成一个memberid。这个生成的算法很
简单,具体如以下伪代码所示。

String memberId = clientId +"-"+UUID.randomuuID().toString()

其中clientld为消费者客户端的clientd,对应请求头中的clientid。由此可见消费者的memberid由clientId和UUID用“”字符拼接而成。

选举消费组的lcader

GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了。在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的memberid,value是消费者相关的元数据信息。leaderId表示leader
消费者的memberid,它的取值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机无异。总体上来说,消费组的leader选举过程是很随意的。

选举分区分配策略

每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的“根据组内的各个消费者投票来决定”不是指GroupCoordinator还要再与各个消费者进行进一步交互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个
消费者支持的最多的策略,具体的选举过程如下:
(1)收集各个消费者支持的所有分配策略,组成候选集candidates。
(2)4每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票。
(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。

如果有消费者并不支持选出的分配策略,那么就会报出异常IllegalArgumentException:Memberdoesnotsupportprotocol。所以请不要为同一个消费组的不同消费者设置不同的分配策略,以防出现问题。需要注意的是,这里所说的“消费者所支持的分配策略”是指partition.assignment.strategy参数配置的策略,如果这个参数值只配置了RangeAssignor,那么这个消费者客户端只支持RangeAssignor分配策略,而不是消费者客户端代码中实现的3种分配策略及可能的自定义分配策略。

在此之后,Kafka服务端就要发送JoinGroupResponse响应给各个消费者,leader消费者和其他普通消费者收到的响应内容并不相同。leader消费者会收到最终的分配策略以及消费者成员信息,而普通消费者只能收到最终的分配策略。由此可见,Kafka把分区分配的具体分配交还给客户端,自身并不参与具体的分配细节,这样即使以后分区分配的策略发生了变更,也只需要重启消费端的应用即可,而不需要重启服务端。该过程可见如下图:

在这里插入图片描述

在这里插入图片描述

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过GroupCoordinator这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案,如下图所示:
在这里插入图片描述

服务端在收到消费者发送的SyncGroupRequest请求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator同样会先对SyncGroupRequest请求做合法性校验,在此之后会将从leader消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka的consumer_offsets主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。

消费者在获得消费分区后会连接broker进行消费,并定期发送心跳给消费者协调器表明自己活着。

消费组元数据信息

我们知道消费者客户端提交的消费位移会保存在Kafka的consumer_offsets主题中,这里也一样,只不过保存的是消费组的元数据信息(GroupMetadata)。具体来说,每个消费组的元数据信息都是一条消息,不过这类消息并不依赖于具体版本的消息格式,因为它只定义了消息中的key和value字段的具体内容,所以消费组元数据信息的保存可以做到与具体的消息格式无关。

第四阶段(HEARTBEAT)

进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的consumeroffsets主题中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。

消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms指定,默认值为3000,即3秒,这个参数必须比session.timeout.ms参数设定的值要小,一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的1/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间。

如果一个消费者发生崩溃,并停止读取消息,那么GroupCoordinator会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由session.timeout.ms参数控制,该参数的配置值必须在broker端参数group.min.session.timeout.ms(默认值为6000,即6秒)和group.max.session.timeout.ms(默认值为300000,即5分钟)允许的范围内。

还有一个参数max.po11.interval.ms,它用来指定使用消费者组管理时poll0方法调用之间的最大延退,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前polio没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

除了被动退出消费组,还可以使用LeaveGroupRequest请求主动退出消费组。

消费者协调器和控制器关系

在Kafka中,控制器(Controller)与组协调器(Group Coordinator)在工作上存在一定的交互,具体体现在以下几个方面:

分区分配与消费者组管理

  • 控制器负责分区管理:控制器负责管理Kafka集群中的分区状态,如分区的创建、删除以及副本的分配等。当一个新的分区被创建时,控制器会决定该分区的副本分布在哪些Broker上。

  • 组协调器依赖分区信息:组协调器在进行消费者组的分区分配时,需要依赖控制器所管理的分区元数据信息。例如,组协调器要根据分区的数量、副本分布以及消费者组内消费者的数量和位置等信息,来为消费者分配合适的分区,以实现负载均衡和高效的数据消费。

选举与状态同步

  • 控制器主导Broker选举:在Kafka集群中,当Broker出现故障或新的Broker加入时,控制器会负责选举新的Leader Broker以及进行相关的状态变更。

  • 组协调器获取选举结果:组协调器需要与控制器进行交互,以获取最新的Broker选举结果和集群状态信息。这有助于组协调器了解哪些Broker是活跃的,哪些是不可用的,从而更好地管理消费者组的状态,确保消费者能够正确地连接到合适的Broker进行数据消费。

心跳检测与故障处理

  • 组协调器检测消费者心跳:组协调器通过心跳机制来检测消费者的存活状态。如果消费者长时间没有发送心跳,组协调器会认为消费者可能出现了故障,并进行相应的处理,如重新分配分区。

  • 控制器协助故障判断:在这个过程中,组协调器可能会与控制器进行交互,以获取更全面的集群状态信息,来确定消费者的故障是否是由于Broker故障等原因引起的。控制器可以提供关于Broker状态、分区状态等方面的信息,帮助组协调器更准确地判断故障情况,并采取合适的措施,如触发重新平衡操作。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com