欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > kafka 消费组 分区分配策略

kafka 消费组 分区分配策略

2024/10/28 13:38:37 来源:https://blog.csdn.net/qq_18218071/article/details/141303069  浏览:    关键词:kafka 消费组 分区分配策略

一、前提

kafka的版本是 2.6.2
一般我们消费kafka的时候是指定消费组,是不会指定消费组内部消费kafka各个分区的分配策略,但是我们也可以指定消费策略,通过源码发现,我们可以有三种分区策略:

  • RangeAssignor (默认)
  • RoundRobinAssignor
  • StickyAssignor

指定消费分区策略

 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

kafka消费分区策略的分区入口类是:ConsumerCoordinatorperformAssignment方法

    @Overrideprotected Map<String, ByteBuffer> performAssignment(String leaderId,String assignmentStrategy,List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {//获取分区策略ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);//存储消费组订阅的所有topicSet<String> allSubscribedTopics = new HashSet<>();//存储消费组内各个消费者对应的基本信息(比如元数据)Map<String, Subscription> subscriptions = new HashMap<>();Map<String, List<TopicPartition>> ownedPartitions = new HashMap<>();for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));subscriptions.put(memberSubscription.memberId(), subscription);allSubscribedTopics.addAll(subscription.topics());ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());}//具体实现在类 AbstractPartitionAssignor (各个分区算法的抽象类)Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();...log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);...return groupAssignment;}

AbstractPartitionAssignorassign()

	//各个分区策略具体的算法public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions);@Overridepublic GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();Set<String> allSubscribedTopics = new HashSet<>();for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());Map<String, Integer> partitionsPerTopic = new HashMap<>();for (String topic : allSubscribedTopics) {Integer numPartitions = metadata.partitionCountForTopic(topic);if (numPartitions != null && numPartitions > 0)partitionsPerTopic.put(topic, numPartitions);elselog.debug("Skipping assignment for topic {} since no metadata is available", topic);}/构建参数 partitionsPerTopic:map,表示各个topic有多少个分区//subscriptions :map,表示消费者相关信息(消费者id,消费者对应的主题)Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions);// this class maintains no user data, so just wrap the resultsMap<String, Assignment> assignments = new HashMap<>();for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));return new GroupAssignment(assignments);}

下面说明下RangeAssignorRoundRobinAssignor两种分区策略的区别

二、RangeAssignor 分区策略

RangeAssignor是默认分配的策略

public class RangeAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "range";}private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());for (String topic : subscriptionEntry.getValue().topics()) {put(topicToConsumers, topic, memberInfo);}}return topicToConsumers;}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {//获取主题对应的消费者列表//partitionsPerTopic 主题对应分区个数//subscriptions 消费者的信息(消费者id,消费者对应的主题,消费者实例)Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);//打印输出,可以看到消费组group-one有两个消费者 consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 和  consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd//其中: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd 消费了 test_topic_partition_one 和 test_topic_partition_two//  consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 只消费了 test_topic_partition_one// consumersPerTopic: {test_topic_partition_one=[MemberInfo [member.id: consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3, group.instance.id: {}], MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]], test_topic_partition_two=[MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]]}Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {//获取topicString topic = topicEntry.getKey();//获取topic对应的消费者List<MemberInfo> consumersForTopic = topicEntry.getValue();//获取topic的分区数Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;Collections.sort(consumersForTopic);//计算每个消费者至少消费几个分区int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//计算剩余几个分区int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();//获取主题分区列表List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);//可以看到前面的消费者会多分配一个分区int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);//计算每个消费者对应的分区列表,可以看到前面的消费者会多分配一个分区assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));}}return assignment;}
}

举例说明:构建消费组下两个消费者, test_topic_partition_onetest_topic_partition_two都是9个分区
进程一:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));

进程二:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one"));

通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-1,
test_topic_partition_one-2, 
test_topic_partition_one-3, 
test_topic_partition_one-4,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4, 
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-5, 
test_topic_partition_one-6, 
test_topic_partition_one-7, 
test_topic_partition_one-8

如果进程二也消费两个主题,则对应的关系变成
通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-1,
test_topic_partition_one-2, 
test_topic_partition_one-3, 
test_topic_partition_one-4,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4,

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-5, 
test_topic_partition_one-6, 
test_topic_partition_one-7, 
test_topic_partition_one-8,
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

可以看到第一个消费者比第二个消费者多消费一个test_topic_partition_one的分区,而且是连续的。同时可以看到分类是按照topic粒度区分的,也就是每个消费者消费一个topic的分区与其他topic是无关的。可以会导致第一个实例运行压力较大的问题。

三、RoundRobinAssignor 分区策略

public class RoundRobinAssignor extends AbstractPartitionAssignor {@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();//存储消费组下所有的消费者,构建两个消费者// 其中一个:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5// 另一个:consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aList<MemberInfo> memberInfoList = new ArrayList<>();for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {assignment.put(memberSubscription.getKey(), new ArrayList<>());memberInfoList.add(new MemberInfo(memberSubscription.getKey(),memberSubscription.getValue().groupInstanceId()));}//排序后的消费者CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();//轮询指定消费者的分区while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic)) {assigner.next();}assignment.get(assigner.next().memberId).add(partition);}return assignment;}//获取排序后的所有主题分区private List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return "roundrobin";}
}

举例说明:构建消费组下两个消费者, test_topic_partition_onetest_topic_partition_two都是9个分区
进程一:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");//指定轮询策略props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));
        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");//指定轮询策略props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one"));

通过上面的分配算法可以得到:
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-2,
test_topic_partition_one-4, 
test_topic_partition_one-6, 
test_topic_partition_one-8,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4, 
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-1, 
test_topic_partition_one-3, 
test_topic_partition_one-5, 
test_topic_partition_one-7

可以看到test_topic_partition_one分区是轮流的分配给两个消费者的

对应的日志

2024-08-19 14:28:34 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  Line:626] [Consumer clientId=consumer-group-one-1, groupId=group-one] Finished assignment for group at generation 44: {consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5=Assignment(partitions=[test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8]), consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855a=Assignment(partitions=[test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7])}

如果进程二也消费两个主题,则对应的关系变成
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-2,
test_topic_partition_one-4, 
test_topic_partition_one-6, 
test_topic_partition_one-8,
test_topic_partition_two-1, 
test_topic_partition_two-3, 
test_topic_partition_two-5, 
test_topic_partition_two-7

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-1, 
test_topic_partition_one-3, 
test_topic_partition_one-5, 
test_topic_partition_one-7
test_topic_partition_two-0, 
test_topic_partition_two-2, 
test_topic_partition_two-4, 
test_topic_partition_two-6, 
test_topic_partition_two-8

也就是会把所有的分区轮流分给两个消费者,所以这种模式就和主题个数与主题分区有关了。

最后

既然看到这里了,就给个赞👍和收藏吧…

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com