欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > 【Iceberg分析】调研Iceberg中表的原地演变

【Iceberg分析】调研Iceberg中表的原地演变

2024/10/24 13:26:14 来源:https://blog.csdn.net/weixin_43820556/article/details/142632376  浏览:    关键词:【Iceberg分析】调研Iceberg中表的原地演变

调研Iceberg中表的原地演变

文章目录

  • 调研Iceberg中表的原地演变
    • 原生非分区表
      • 文件关系图
      • 表的原地演变之表schema演变
        • 新增字段new_column
        • 文件关系变化图
        • 为新增字段写入数据
        • 文件关系变化图
        • 删除新增字段
        • 文件关系变化图
        • 新增字段new_column2
        • 文件关系变化图
        • 删除数据
        • 文件关系变化图
    • 原生分区表
      • Iceberg支持如下几种分区转换
      • 文件关系变化图
      • 表的原地演变之分区演变
        • 新增分区
        • 文件关系变化图
        • 删除分区
        • 删除数据
        • 文件关系变化图
    • 小结

以《基于spark3.4.2+iceberg1.6.1搭建本地阅读调试环境》为基础环境,调研原地演变特性

工程中iceberg_warehousespark.sql.catalog.local.warehouse 指定了 Iceberg 数据文件和元数据文件的存放路径。

原生非分区表

创建非分区原生表,并插入数据。

        // 1.创建库spark.sql("create database iceberg_db");// 2.新建表spark.sql("CREATE TABLE local.iceberg_db.table1 (id bigint, data string) USING iceberg ");// 3.第1次新增数据spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (1, 'a'), (2, 'b'), (3, 'c')");// 4.第2次新增数据spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (4, 'd'), (5, 'e'), (6, 'f')");// 5.第3次新增数据spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (7, 'g'), (8, 'h'), (9, 'i')");

在这里插入图片描述

文件关系图

数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table1

新建表时,会触发元数据的变化,此时是没有数据文件的,所以只有v1.metadata.json文件。

  • snap-开头的是清单列表文件(manifest list)
  • 紧接着snap之后的数字开头的是清单文件(manifest file)

表的原地演变之表schema演变

新增字段new_column
        // 6.新增字段new_columnspark.sql("ALTER TABLE local.iceberg_db.table1 " +"ADD COLUMNS ( new_column string comment 'new_column docs' )");
文件关系变化图
数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
v5.metadata.json
ceberg_db.table1

v5.metadata.jsonschemas以数组的形式记录了不同的表schema。以schema-id区分。new_column字段上有对应的字段id3current-schema-id中是当前生效的schema-id

 "current-schema-id" : 1,
"schemas" : [ {"type" : "struct","schema-id" : 0,"fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"} ]}, {"type" : "struct","schema-id" : 1,"fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"}, {"id" : 3,"name" : "new_column","required" : false,"type" : "string","doc" : "new_column docs"} ]} ]
为新增字段写入数据
        // 7.为新增字段new_column增加数据spark.sql("INSERT INTO local.iceberg_db.table1 VALUES (10, 'j','new1'), (11, 'k','new2'), (12, 'l','new3')");Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");result.show();

查询结果

表的schema中新增字段在之前的记录以null填充展示。

+---+----+----------+
| id|data|new_column|
+---+----+----------+
|  7|   g|      null|
|  8|   h|      null|
|  9|   i|      null|
|  1|   a|      null|
|  2|   b|      null|
|  3|   c|      null|
| 10|   j|      new1|
| 11|   k|      new2|
| 12|   l|      new3|
|  4|   d|      null|
|  5|   e|      null|
|  6|   f|      null|
+---+----+----------+
文件关系变化图
数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
v5.metadata.json
v6.metadata.json
Manifest list
avro格式
Manifest file
avro格式
ceberg_db.table1
删除新增字段
        // 8.删除字段new_columnspark.sql("ALTER TABLE local.iceberg_db.table1 DROP COLUMNS new_column");Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");result.show();

查询结果

表schema的删除字段在,之前的记录全部除了删除字段,全部可以查询展示。

+---+----+
| id|data|
+---+----+
|  4|   d|
|  5|   e|
|  6|   f|
|  1|   a|
|  2|   b|
|  3|   c|
| 10|   j|
| 11|   k|
| 12|   l|
|  7|   g|
|  8|   h|
|  9|   i|
+---+----+
文件关系变化图
数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
v6.metadata.json
v7.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table1

v7.metadata.jsoncurrent-schema-id中是当前生效的schema-id改为了0

新增字段new_column2
        // 9.删除字段new_columnspark.sql("ALTER TABLE local.iceberg_db.table1 " +"ADD COLUMNS ( new_column2 string comment 'new_column2 docs' )");
文件关系变化图
数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
v6.metadata.json
v7.metadata.json
v8.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table1

v8.metadata.jsonschemas的变化,删除字段new_column的id3,不会再之后新增的new_column2不会再使用了。

"current-schema-id" : 2,
"schemas" : [ {"type" : "struct","schema-id" : 0,"fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"} ]
}, {"type" : "struct","schema-id" : 1,"fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"}, {"id" : 3,"name" : "new_column","required" : false,"type" : "string","doc" : "new_column docs"} ]
}, {"type" : "struct","schema-id" : 2,"fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"}, {"id" : 4,"name" : "new_column2","required" : false,"type" : "string","doc" : "new_column2 docs"} ]
} ]
删除数据
        // 10.删除字段new_columnspark.sql("DELETE FROM local.iceberg_db.table1  where id in (2,5,10)");Dataset<Row> result = spark.sql("select * from local.iceberg_db.table1");result.show();
文件关系变化图
数据层
元数据层
catalog
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
v6.metadata.json
v7.metadata.json
v8.metadata.json
v9.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table1

不同版本的metadata文件会使用不同的清单文件指向相同的数据文件,清单文件(manifest file)中的status字段取值说明,值1代表add,值2代表删除。

原生分区表

Iceberg支持如下几种分区转换

转换名称描述源字段类型结果类型
identityid值,默认没有转换函数。注意:如果用时间戳做为分区的话,每个时间戳是一个分区,随着数据的写入,元数据很快会崩溃AnySource type
bucket[N]哈希值],模Nint, long, decimal, date, time, timestamp, timestamptz, timestamp_ns, timestamptz_ns, string, uuid, fixed, binaryint
truncate[W]将字段按宽度截取int, long, decimal, string, binary与源字段类型一致,如果源字段是字符串则截取W长度,如果是int/long则相除W倍后取整
year将时间转换为年date, timestamp, timestamptz, timestamp_ns, timestamptz_nsint
month将时间转换为月date, timestamp, timestamptz, timestamp_ns, timestamptz_nsint
day将时间转换为日date, timestamp, timestamptz, timestamp_ns, timestamptz_nsint
hour将时间转换为小时timestamp, timestamptz, timestamp_ns, timestamptz_nsint
voidAlways produces nullAnySource type or int

创建分区原生表,使用分区转换进行隐藏分区,并插入数据。

        // 1.创建分区表,以month方法进行隐藏式分区spark.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (month(ts))");// 2.新增数据spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (1, 'a', cast(1727601585 as timestamp)),(2, 'b', cast(1724923185 as timestamp)),(3, 'c', cast(1724919585 as timestamp))");

在这里插入图片描述

文件关系变化图

数据层
元数据层
catalog
ts_month=2024-08
ts_month=2024-09
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
Manifest list
avro格式
Manifest file
avro格式
ceberg_db.table2

Iceberg 通过获取列值并对其进行可选转换来生成分区值。建表时,ts字段类型是使用timestamp,默认使用带时区的timestamptz

v1.metadata.jsonpartition-specs以数组的形式记录了不同的表分区规则,以spec-id区分。default-spec-id中是当前生效的spec-id

  "default-spec-id" : 0,"partition-specs" : [ {"spec-id" : 0,"fields" : [ {"name" : "ts_month","transform" : "month","source-id" : 3,"field-id" : 1000} ]} ]

表的原地演变之分区演变

新增分区
        // 3.以day()方法新增分区spark.sql("ALTER TABLE local.iceberg_db.table2 ADD PARTITION FIELD day(ts)");// 4.新增数据spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1727605185 as timestamp)),(5, 'e', cast(1725963585 as timestamp)),(6, 'f', cast(1726827585 as timestamp))");
文件关系变化图
数据层
元数据层
catalog
ts_month=2024-09
ts_month=2024-08
ts_day=2024-09-10
ts_day=2024-09-20
ts_day=2024-09-29
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table2

v3.metadata.jsonpartition-specs的变化,default-spec-id采用了新的分区组合spec-id1

  "default-spec-id" : 1,"partition-specs" : [ {"spec-id" : 0,"fields" : [ {"name" : "ts_month","transform" : "month","source-id" : 3,"field-id" : 1000} ]}, {"spec-id" : 1,"fields" : [ {"name" : "ts_month","transform" : "month","source-id" : 3,"field-id" : 1000}, {"name" : "ts_day","transform" : "day","source-id" : 3,"field-id" : 1001} ]} ]

可以发现:

  1. v3.metadata.json发现分区演变是一种元数据操作,并不急于重写文件。
  2. 表分区可以在现有表中更新
  3. 多个分区的共同存在。
删除分区
        spark.sql("ALTER TABLE local.iceberg_db.table2 DROP PARTITION FIELD month(ts)");
数据层
元数据层
catalog
ts_month=2024-09
ts_month=2024-08
ts_day=2024-09-10
ts_day=2024-09-20
ts_day=2024-09-29
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table2

v5.metadata.jsonpartition-specs的变化,default-spec-id采用了新的分区组合spec-id2

  "default-spec-id" : 2,"partition-specs" : [ {"spec-id" : 0,"fields" : [ {"name" : "ts_month","transform" : "month","source-id" : 3,"field-id" : 1000} ]}, {"spec-id" : 1,"fields" : [ {"name" : "ts_month","transform" : "month","source-id" : 3,"field-id" : 1000}, {"name" : "ts_day","transform" : "day","source-id" : 3,"field-id" : 1001} ]}, {"spec-id" : 2,"fields" : [ {"name" : "ts_day","transform" : "day","source-id" : 3,"field-id" : 1001} ]} ]
删除数据
        spark.sql("DELETE FROM local.iceberg_db.table2  where id in (2)");
文件关系变化图
数据层
元数据层
catalog
ts_month=2024-09
ts_month=2024-08
ts_day=2024-08-29
ts_day=2024-09-10
ts_day=2024-09-20
ts_day=2024-09-29
status=2
status=0
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
data files
parquet格式
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
v6.metadata.json
Manifest list
avro格式
Manifest list
avro格式
Manifest list
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
Manifest file
avro格式
ceberg_db.table2

删除数据操作会触发数据文件的变化,此时目录ts_day=2024-08-29已经于ts_month=2024-08平级。ts_day=2024-08-29中的数据文件会保留删除之后的数据。

由于分区的变化后,旧的分区规则产生的数据文件发生了数据变化,会产生一个新清单文件(maifest file)中的,会对旧的数据文件进行索引,以上述为例,v6.metadata.json对应的清单列表文件(maifest list)中存储了一个清单文件(maifest file)即虚线框展示的,其中存储了两个datafile的引用,status=2代表删除,status=0代表文件已经存在。

小结

  • 每一个操作都会产生一个新的元数据文件(metadata.json),需要配置自动清理元数据文件
  • 所有一个文件都伴有一个.crc文件,小文件的问题怎么办?
  • Iceberg使用唯一的id来跟踪表中的每一列。添加列时,将为其分配一个新ID,以便不会错误地使用现有数据。
  • 分区演变时,是元数据的操作,数据文件的操作是滞后的,有数据变动时才会进行文件的重写。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com