Spark与Iceberg集成落地实践(一)
文章目录
- Spark与Iceberg集成落地实践(一)
- 清理快照与元数据
- 配置表维度自动清理元数据文件属性
- 手动清理
- 清理孤岛文件
- 合并数据文件
清理快照与元数据
配置表维度自动清理元数据文件属性
每一次写入数据和表变更都会进行一次元数据的版本迭代,默认保存所有。
Property | Description |
---|---|
write.metadata.delete-after-commit.enabled | 每次表提交后是否删除旧的跟踪的元数据文件 |
write.metadata.previous-versions-max | 要保留的旧元数据文件的数量 |
SPARK DDL语句
建表时确认metadata生命周期
sparkSession.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (day(ts)) TBLPROPERTIES('write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='3')");
更改表的metadata生命周期
sparkSession.sql("ALTER TABLE local.iceberg_db.table2 SET TBLPROPERTIES(" +"'write.metadata.delete-after-commit.enabled'='true'," +"'write.metadata.previous-versions-max'='3'" +")");
作用
这只会删除元数据日志中跟踪的元数据文件,而不会删除孤立的元数据文件。
清理从metadata.json
链路开始的至data的所有文件,如下图:
手动清理
org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");long tsToExpire = System.currentTimeMillis() - (1000 * 60 * 60 * 24); // 保留一天org.apache.iceberg.spark.actions.SparkActions.get().expireSnapshots(table).expireOlderThan(tsToExpire).execute();
清理孤岛文件
孤岛文件的产生:
在 Spark 和其他分布式处理引擎中,任务或作业失败可能会留下未被表元数据引用的文件,在某些情况下,正常快照过期可能无法确定文件不再需要并将其删除。任务失败之后,最好进行一次清理表孤岛文件,若表相关任务成功,则不需要进行清理孤岛文件操作。
org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");org.apache.iceberg.spark.actions.SparkActions.get().deleteOrphanFiles(table).execute();
合并数据文件
目前发现,需要分区类有标记删除的记录才会进行合并,why?
org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");org.apache.iceberg.spark.actions.SparkActions.get().rewriteDataFiles(table).filter(Expressions.equal("ts", "2024-09-29")).option("target-file-size-bytes", Long.toString(500 * 1024 * 1024)) // 目标大小500 MB.execute();