欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 资讯 > 【大数据学习 | kafka】kafka的数据存储结构

【大数据学习 | kafka】kafka的数据存储结构

2025/1/2 23:54:24 来源:https://blog.csdn.net/2301_80912559/article/details/143442329  浏览:    关键词:【大数据学习 | kafka】kafka的数据存储结构

以上是kafka的数据的存储方式。

这些数据可以在服务器集群上对应的文件夹中查看到。

[hexuan@hadoop106 __consumer_offsets-0]$ ll
总用量 8
-rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index
-rw-rw-r--. 1 hexuan hexuan        0 10月 28 22:21 00000000000000000000.log
-rw-rw-r--. 1 hexuan hexuan 10485756 10月 28 22:21 00000000000000000000.timeindex
-rw-rw-r--. 1 hexuan hexuan        8 10月 28 22:21 leader-epoch-checkpoint
-rw-rw-r--. 1 hexuan hexuan       43 10月 28 22:21 partition.metadata

每个文件夹以topic+partition进行命名,更加便于管理和查询检索,因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,所以检索速度必须要快,分块管理是最好的方式。

消费者在检索相应数据的时候会非常的简单。

consumer检索数据的过程。

首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数。

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
代表存储的数据是从0条开始的

00000000000000100000.index
00000000000000100000.log
00000000000000100000.timeindex
代表存储的数据是从100000条开始的

所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据

那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容

# 首先创建一个topic_bkafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_b --partitions 5 --replication-factor 2
# 然后通过代码随机向不同的分区中分发不同的数据1W条
package com.hainiu.kafka.consumer;/*** ClassName : test1* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 22:45* Version 1.0*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test1 {public static void main(String[] args) throws InterruptedException {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);pro.put(ProducerConfig.RETRIES_CONFIG, 3);pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);for (int i = 0; i < 10000; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");producer.send(record);}producer.close();}
}

然后去查看log和index中的内容

# kafka查看日志和索引的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx

查看日志.log

[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log 
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
baseOffset: 606 lastOffset: 1205 count: 600 baseSequence: 606 lastSequence: 1205 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 5149 CreateTime: 1730645208577 size: 4929 magic: 2 compresscodec: snappy crc: 1974998903 isvalid: true
baseOffset: 1206 lastOffset: 1439 count: 234 baseSequence: 1206 lastSequence: 1439 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 10078 CreateTime: 1730645208584 size: 2085 magic: 2 compresscodec: snappy crc: 1665550202 isvalid: true

查看索引.index

内容即:

index索引

offset 第几条position 物理偏移量位置,也就是第几个字
11875275
176710140
202215097

log日志

# 打印日志内容的命令 --print-data-log 打印数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
| offset: 0 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 0 headerKeys: [] key: 14 payload: this is hainiu
| offset: 1 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 1 headerKeys: [] key: 19 payload: this is hainiu
| offset: 2 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 2 headerKeys: [] key: 24 payload: this is hainiu
| offset: 3 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 3 headerKeys: [] key: 26 payload: this is hainiu

可以看到刷写的日志

baseOffset: 0 lastOffset: 605 count: 606

从0 到605 条一次性刷写606条

lastSequence: 605 producerId

刷写事务日志编号,生产者的编号

通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据。

大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引。

官网给出的默认值4KB

一个数据段大小是1G

timeIndex

我们看到在数据中还包含一个timeindex的时间索引

# 查询时间索引
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
Dumping 00000000000000000000.timeindex
timestamp: 1730645208577 offset: 1205
timestamp: 1730645208584 offset: 1439

可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算

也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com