目录
创建主题
相关概念
主题:Topic
分区:Partition
副本:Replication
副本类型:Leader & Follower
日志:Log
创建第一个主题
执行指令
ZooKeeper节点变化
数据存储位置
创建第二个主题
执行指令
ZooKeeper节点变化
数据存储位置
创建第三个主题
执行指令
ZooKeeper节点变化
数据存储位置
创建主题流程
命令行提交创建指令
Controller接收创建主题请求
创建主题
创建主题
相关概念
主题:Topic
Kafka是一个分布式消息传输系统,采用发布/订阅模式。主题(Topic)是对消息进行逻辑分类的一种手段,由外部业务场景定义(除了两个用于记录消费者偏移量和事务处理的固定主题)。消息的生产者必须将消息发送到特定的主题,而消费者则从特定的主题中获取消息,并且可以同时消费一个或多个主题的数据。
分区:Partition
分区(Partition)是主题的物理分割,用于解决单个Broker节点上的负载和吞吐量问题。每个主题至少包含一个分区,默认情况下分区数量为1。每个分区都是一个有序的消息队列,并且分区内的每条消息都有一个唯一的偏移量(Offset)。
副本:Replication
为了提高系统的容错性和可靠性,Kafka允许为每个分区创建多个副本(Replication)。副本不能放在同一个Broker上,以避免单点故障导致的数据丢失。通常,分区的一个副本作为Leader副本,负责所有读写操作,其他副本作为Follower副本,用于数据备份。
副本类型:Leader & Follower
Leader副本处理所有的读写请求,而Follower副本则保持与Leader副本的数据同步。只有Leader副本可以接受读写操作。
日志:Log
Kafka接收的消息数据最终存储在Log日志文件中。每个主题创建后,都会为其创建对应的分区数据Log文件,并准备好写入数据。
创建第一个主题
执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic first-topic
ZooKeeper节点变化
创建后,ZooKeeper中的/config/topics
和/brokers/topics
节点会新增与新主题相关的子节点。
数据存储位置
主题创建后,数据存储在分区Leader副本所在的Broker节点上。
创建第二个主题
执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic second-topic --partitions 3
ZooKeeper节点变化
创建带有3个分区的主题后,ZooKeeper会记录每个分区的配置信息。
数据存储位置
分区Leader副本分别位于不同的Broker节点上。
创建第三个主题
执行指令
[lzl@kafka-broker1 ~]$ cd /opt/module/kafka
[lzl@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic third-topic --partitions 3 --replication-factor 3
ZooKeeper节点变化
创建带有3个分区和3个副本的主题后,ZooKeeper记录每个分区的配置和其副本分布。
数据存储位置
分区的多个副本分布在不同的Broker节点上,以提供高可用性和容错能力。
以上步骤展示了如何通过命令行工具创建具有不同分区和副本数目的主题,并简述了创建主题时涉及到的关键概念和ZooKeeper中的节点变化。
创建主题流程
命令行提交创建指令
- 通过命令行提交指令:
- 指令中包含操作类型(
--create
)、主题名称(--topic
)、主题分区数量(--partitions
)、主题分区副本数量(--replication-factor
)、副本分配策略(--replica-assignment
)等参数。
- 指令中包含操作类型(
- 指令处理:
- 提交至客户端进行处理。
- 客户端获取指令后,对指令参数进行校验:
- 操作类型取值:
create
、list
、alter
、describe
、delete
,只能存在一个。 - 分区数量为大于1的整数。
- 主题是否已存在。
- 分区副本数量大于1且小于
Short.MaxValue
,一般取值小于等于Broker数量。
- 操作类型取值:
- 封装主题对象:
- 将参数封装为主题对象(
NewTopic
)。
- 将参数封装为主题对象(
- 创建通信对象:
- 设定请求标记(
CREATE_TOPICS
),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
- 设定请求标记(
Controller接收创建主题请求
- 接收网络请求:
- Controller节点接收到网络请求(
Acceptor
),并将请求数据封装成请求对象放置在队列(requestQueue
)中。
- Controller节点接收到网络请求(
- 处理请求:
- 请求控制器(
KafkaRequestHandler
)周期性从队列中获取请求对象(BaseRequest
)。 - 将请求对象转发给请求处理器(
KafkaApis
),根据请求对象的类型调用创建主题的方法。
- 请求控制器(
创建主题
-
请求处理器校验主题参数:
- 如果分区数量没有设置,则采用Kafka启动时加载的配置项:
num.partitions
(默认值为1)。 - 如果副本数量没有设置,则采用Kafka启动时加载的配置项:
default.replication.factor
(默认值为1)。
- 如果分区数量没有设置,则采用Kafka启动时加载的配置项:
-
分区副本分配:
- 使用
replica-assignment
参数指定的方案创建分区副本。 - 如果未指定
replica-assignment
参数,则按照Kafka内部逻辑分配,当前采用的是未指定机架信息的副本分配策略。- 分区起始索引设置为0。
- 计算每一个分区的所有副本位置:
- 副本起始索引 = (分区编号 + 随机值)%
BrokerID列表长度
。 - 其他副本索引 = (第一个副本索引 + (1 +(副本分配间隔 + n)% (
BrokerID列表长度
- 1))) %BrokerID列表长度
。
- 副本起始索引 = (分区编号 + 随机值)%
- 使用
-
示例计算:
-
假设当前分区编号:0。
-
BrokerID列表
:【1,2,3,4】。 -
副本数量:4。
-
随机值(
BrokerID列表长度
):2。 -
副本分配间隔随机值(
BrokerID列表长度
):2。-
第一个副本索引:(0 + 2)% 4 = 2。
-
第一个副本所在
BrokerID
:3。 -
第二个副本索引:(2 +(1+(2+0)%3))% 4 = 1。
-
第二个副本所在
BrokerID
:2。 -
第三个副本索引:(2 +(1+(2+1)%3))% 4 = 3。
-
第三个副本所在
BrokerID
:4。 -
第四个副本索引:(2 +(1+(2+2)%3))% 4 = 0。
-
第四个副本所在
BrokerID
:1。
-
-
最终分区0的副本所在的
Broker节点列表
为【3,2,4,1】,其他分区采用同样算法。
-
-
保存分区副本ID列表:
- 通过索引位置获取副本节点ID。
- 保存分区以及对应的副本ID列表。
-
ZK端创建节点:
- 在
/config/topics
节点下,增加当前主题节点,节点类型为持久类型。 - 在
/brokers/topics
节点下,增加当前主题及相关节点,节点类型为持久类型。
- 在
-
Controller节点处理:
- 启动后,在
/brokers/topics
节点增加监听器,一旦节点发生变化,会触发相应功能:- 获取需要新增的主题信息。
- 更新当前Controller节点保存的主题状态数据。
- 更新分区状态机的状态为:
NewPartition
。 - 更新副本状态机的状态:
NewReplica
。 - 更新分区状态机的状态为:
OnlinePartition
,从正常的副本列表中获取第一个作为分区的Leader副本,所有副本作为分区的同步副本列表(ISR)。 - 在ZK路径
/brokers/topics/主题名
上增加分区节点/partitions
,及状态/state
节点。 - 更新副本状态机的状态:
OnlineReplica
。
- 启动后,在
-
发送请求:
- Controller节点向主题的各个分区副本所属Broker节点发送
LeaderAndIsrRequest
请求。 - 向所有Broker发送
UPDATE_METADATA
请求,更新自身缓存。- Controller向分区所属的Broker发送请求。
- Broker节点接收到请求后,根据分区状态信息,设定当前的副本为Leader或Follower,并创建底层的数据存储文件目录和空的数据文件。
- 文件目录名:主题名 + 分区编号。
- 文件名:
0000000000000000.log
:数据文件,用于存储消息。0000000000000000.index
:索引文件,用于定位数据。0000000000000000.timeindex
:时间索引文件,用于定位数据。
- Controller节点向主题的各个分区副本所属Broker节点发送