欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 八卦 > 【Iceberg分析】Spark与Iceberg集成落地实践(一)

【Iceberg分析】Spark与Iceberg集成落地实践(一)

2024/11/30 10:39:07 来源:https://blog.csdn.net/weixin_43820556/article/details/142743902  浏览:    关键词:【Iceberg分析】Spark与Iceberg集成落地实践(一)

Spark与Iceberg集成落地实践(一)

文章目录

  • Spark与Iceberg集成落地实践(一)
    • 清理快照与元数据
      • 配置表维度自动清理元数据文件属性
      • 手动清理
    • 清理孤岛文件
    • 合并数据文件

清理快照与元数据

配置表维度自动清理元数据文件属性

每一次写入数据和表变更都会进行一次元数据的版本迭代,默认保存所有。

PropertyDescription
write.metadata.delete-after-commit.enabled每次表提交后是否删除旧的跟踪的元数据文件
write.metadata.previous-versions-max要保留的旧元数据文件的数量

SPARK DDL语句

建表时确认metadata生命周期

		sparkSession.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (day(ts)) TBLPROPERTIES('write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='3')");

更改表的metadata生命周期

        sparkSession.sql("ALTER TABLE local.iceberg_db.table2 SET TBLPROPERTIES(" +"'write.metadata.delete-after-commit.enabled'='true'," +"'write.metadata.previous-versions-max'='3'" +")");

作用

这只会删除元数据日志中跟踪的元数据文件,而不会删除孤立的元数据文件。

清理从metadata.json链路开始的至data的所有文件,如下图:

数据层
元数据层
data file1
data file2
data file3
data file4
v2.metadata.json
Manifest list1
Manifest file1
Manifest file2

手动清理

        org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");long tsToExpire = System.currentTimeMillis() - (1000 * 60 * 60 * 24); // 保留一天org.apache.iceberg.spark.actions.SparkActions.get().expireSnapshots(table).expireOlderThan(tsToExpire).execute();

清理孤岛文件

孤岛文件的产生:

在 Spark 和其他分布式处理引擎中,任务或作业失败可能会留下未被表元数据引用的文件,在某些情况下,正常快照过期可能无法确定文件不再需要并将其删除。任务失败之后,最好进行一次清理表孤岛文件,若表相关任务成功,则不需要进行清理孤岛文件操作。

		org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");org.apache.iceberg.spark.actions.SparkActions.get().deleteOrphanFiles(table).execute();

合并数据文件

目前发现,需要分区类有标记删除的记录才会进行合并,why?

		org.apache.iceberg.Table table = org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");org.apache.iceberg.spark.actions.SparkActions.get().rewriteDataFiles(table).filter(Expressions.equal("ts", "2024-09-29")).option("target-file-size-bytes", Long.toString(500 * 1024 * 1024)) // 目标大小500 MB.execute();

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com