欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Spark 中 cache、persist 和 checkpoint 优化数据处理的三种重要机制介绍

Spark 中 cache、persist 和 checkpoint 优化数据处理的三种重要机制介绍

2025/4/20 5:06:33 来源:https://blog.csdn.net/z1941563559/article/details/143894377  浏览:    关键词:Spark 中 cache、persist 和 checkpoint 优化数据处理的三种重要机制介绍

在 Spark 中,cachepersistcheckpoint 是优化数据处理的三种重要机制。它们都旨在减少数据重算和优化性能,但有各自的应用场景和实现原理。下面从源码角度分析其原理、作用和适用场景。


1. 基本概念和作用

机制作用存储介质
cache将数据存储在内存中以加快后续计算速度。默认存储在内存
persist提供多种存储级别(如内存和磁盘),支持灵活选择存储策略。内存、磁盘、堆外内存等
checkpoint将数据保存到可靠的存储系统(如 HDFS),提供容错能力,打断 DAG 依赖链。HDFS 或其他持久化存储

2. 核心原理

2.1 Cache

cachepersist 的简化版,其底层实现直接调用 persist(StorageLevel.MEMORY_AND_DISK),默认将数据存储在内存中,如果内存不足,则溢写到磁盘。

源码分析

  • 在 RDD 中,cache() 的代码:
    def cache(): this.type = persist(StorageLevel.MEMORY_AND_DISK)
    
  • persist 方法核心逻辑:
    def persist(newLevel: StorageLevel): this.type = {if (storageLevel != StorageLevel.NONE && storageLevel != newLevel) {throw new UnsupportedOperationException("Cannot change storage level...")}storageLevel = newLevelthis
    }
    
  • 执行时,RDD 的 computeOrReadCheckpoint 方法判断是否已经缓存:
    if (isCached) {SparkEnv.get.blockManager.getOrElseUpdate(blockId, ...)
    } else {compute(split, context)
    }
    

作用

  • 加速重复计算:避免重复计算 DAG 中的父节点。
  • 默认存储级别为 MEMORY_AND_DISK,当内存不足时,溢写磁盘。

适用场景

  • 数据需要被多次使用,但不需要跨作业的容错能力。
  • 计算代价大,但内存能够容纳数据。

2.2 Persist

persistcache 的增强版,允许用户选择存储级别(StorageLevel),如:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • DISK_ONLY
  • 堆外内存、序列化存储等。

源码分析

  • StorageLevel 是一个枚举类,定义了各种存储级别:
    case class StorageLevel(useDisk: Boolean,useMemory: Boolean,useOffHeap: Boolean,deserialized: Boolean,replication: Int
    )
    
  • persist 方法直接调用 BlockManager 存储数据,核心逻辑:
    blockManager.putIterator(blockId,values,level,tellMaster = true
    )
    

作用

  • 提供更灵活的存储策略,适应内存、磁盘等不同环境。

适用场景

  • 数据较大,内存无法完全容纳,需要存储到磁盘或其他媒介。
  • 数据跨作业使用时(需确保存储级别满足作业要求)。

2.3 Checkpoint

checkpoint 会将 RDD 的数据保存到可靠存储(如 HDFS),并将 RDD 的依赖链打断,从而减少 DAG 深度,增强容错能力。

源码分析

  • RDDcheckpoint 方法:
    def checkpoint(): Unit = synchronized {if (doCheckpoint()) { // 检查是否需要 checkpointval newRDD = new CheckpointRDD(this)this.rdd = newRDD // 更新依赖为 CheckpointRDD}
    }
    
  • CheckpointRDD 会从持久化存储中加载数据:
    override def compute(split: Partition, context: TaskContext): Iterator[T] = {val path = getCheckpointPath(split)val data = loadFromHDFS(path)data.iterator
    }
    

作用

  • 容错:数据保存到可靠存储中。
  • 优化 DAG:打断长依赖链,减少重算开销。

适用场景

  • 作业链较长,DAG 深度过大,容易导致重算开销。
  • 需要跨作业使用 RDD 数据,且要求数据容错性强。

3. 使用对比

特点CachePersistCheckpoint
存储位置内存(默认)或磁盘溢写多种存储级别可靠存储系统(如 HDFS)
容错性低,数据丢失需重算低至中,取决于存储级别高,数据可靠存储
DAG 优化有,打断依赖链
开销较低高(需要持久化和 I/O 操作)

4. 使用场景总结

Cache
  • 数据需要被频繁多次使用,且内存能够容纳。
  • 例如:在机器学习中对训练数据进行多次迭代。
Persist
  • 数据规模较大,内存无法完全容纳,需结合磁盘。
  • 例如:图计算中存储中间结果,避免重复计算。
Checkpoint
  • 作业链较长,可能因 DAG 深度导致失败或性能下降。
  • 需要跨作业的容错能力。
  • 例如:深度学习中的训练数据预处理、长链条依赖的 ETL 作业。

5. 综合优化建议

  • 优先考虑 cachepersist:仅当 DAG 深度问题显著时,使用 checkpoint
  • 设置合理的存储级别:根据内存和磁盘资源选择最优存储策略。
  • 结合 checkpointpersist:在 Checkpoint 前对数据 Persist,避免重新计算数据。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词