欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 手游 > Spark-Task启动流程

Spark-Task启动流程

2024/10/24 18:22:48 来源:https://blog.csdn.net/lu070828/article/details/141355645  浏览:    关键词:Spark-Task启动流程

一、上下文

《Spark-Job启动、Stage划分》详细分析了Job的启动和Stage的划分,并将计算好了Task的数量以及将要去的Executor,下面我们就来看看Task是如何跑起来的?

二、TaskSchedulerImpl

《Spark-Job启动、Stage划分》最后讲到了:构建好的任务集会调用TaskSchedulerImpl.submitTasks(new TaskSet(tasks.toArray, stage.id,......)),我们从这个方法继续

private[spark] class TaskSchedulerImpl(...){//双层HashMap  一个帮助Stage重试多次完成所有Task的数据结构//Map(StageId -> Map(StageAttemptId -> TaskSetManager))private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]override def submitTasks(taskSet: TaskSet): Unit = {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "+ "resource profile " + taskSet.resourceProfileId)this.synchronized {//创建一个新的TaskSetManager//在TaskSchedulerImpl中的单个TaskSet中安排任务。此类跟踪每个任务,//在任务失败时重试任务(最多次数有限),并通过延迟调度处理此TaskSet的本地感知调度。//它的主要接口是resourceOffer,它询问TaskSet是否要在一个节点上运行任务;//并通过handleSuccessfulTask/handeFailedTask告诉它的一个任务更改了状态(例如完成/失败)。//此类设计为只能从TaskScheduler上带有锁的代码中调用。它不应该从其他线程调用。val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])//为了处理极端情况,将此Stage现有的TaskSetManagers标记为僵尸,并创建一个新的TaskSetManager //假设一个Stage有10个分区和2个TaskSetManagers:TSM1(僵尸)和TSM2(活动)//TSM1完成了第10个分区的Task,TSM2完成了1-9分区的Task,//DAGScheduler会收到所有10个分区的Task完成事件,并认为该Stage已经完成。//如果这是一个ShuffleMapStage,并且前一个Stage的输出有缺失,那么DAGScheduler将重新提交并为其创建TSM3stageTaskSets.foreach { case (_, ts) =>ts.isZombie = true}//为这次重试设置新的TaskSetManagerstageTaskSets(taskSet.stageAttemptId) = managerschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)//只有local模式 isLocal才是trule //第一次调度任务时 hasReceivedTask = false//也就是非local模式下第一次任务调度会判断成true走下面这个逻辑if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run(): Unit = {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//SparkContext初始化时就对其设置过//如果时Standalone模式  那么设置的就是StandaloneSchedulerBackend//如果时local模式,那么设置的就是LocalSchedulerBackendbackend.reviveOffers()}}

三、StandaloneSchedulerBackend

//等待粗粒度执行器连接的调度程序后端。此后端在Spark作业期间保留每个执行器
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)extends ExecutorAllocationClient with SchedulerBackend with Logging {override def reviveOffers(): Unit = Utils.tryLogNonFatalError {//给自己 driver 端点发一个 ReviveOffers 消息driverEndpoint.send(ReviveOffers)}}class DriverEndpoint extends IsolatedRpcEndpoint with Logging {override def receive: PartialFunction[Any, Unit] = {case ReviveOffers =>makeOffers()}private def makeOffers(): Unit = {// 当Task向 executor 移动时,确保 executor 没有被 killedval taskDescs = withLock {// 筛选出被kill 的 executorval activeExecutors = executorDataMap.filterKeys(isExecutorActive)val workOffers = activeExecutors.map {case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores,Some(executorData.executorAddress.hostPort),executorData.resourcesInfo.map { case (rName, rInfo) =>(rName, rInfo.availableAddrs.toBuffer)}, executorData.resourceProfileId)}.toIndexedSeqscheduler.resourceOffers(workOffers, true)}if (taskDescs.nonEmpty) {//之前我们看源码发先,application 、driver、executor 都有描述符 ,且它们都是用于启动用的,现在出现了Task的描述符,紧接着就开始启动TasklaunchTasks(taskDescs)}}// 在一些有资源的executor 上启动 Taskprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {val serializedTask = TaskDescription.encode(task)if (serializedTask.limit() >= maxRpcMessageSize) {//如果序列化后的Task大于最大的rpc信息大小 会终止Task}else {val executorData = executorDataMap(task.executorId)// 在这里进行资源分配。分配的资源将在任务完成后释放val rpId = executorData.resourceProfileIdval prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)executorData.freeCores -= taskCpustask.resources.foreach { case (rName, rInfo) =>assert(executorData.resourcesInfo.contains(rName))executorData.resourcesInfo(rName).acquire(rInfo.addresses)}//向目标executor端点发送 LaunchTask 消息,参数就是序列化后的Task,接下来我们看下executor 端(CoarseGrainedExecutorBackend)时如何处理的,executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}}

四、CoarseGrainedExecutorBackend

private[spark] class CoarseGrainedExecutorBackend(...)extends IsolatedRpcEndpoint with ExecutorBackend with Logging {var executor: Executor = nulloverride def receive: PartialFunction[Any, Unit] = {case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {//对Task描述符进行解码val taskDesc = TaskDescription.decode(data.value)taskResources(taskDesc.taskId) = taskDesc.resources//调用executor对象的launchTask()executor.launchTask(this, taskDesc)}}
}

五、Executor

//Spark执行器,由线程池支持运行任务。内部RPC接口用于与驱动程序通信
private[spark] class Executor(...){def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {//TaskRunner继承了Runnable ,它时一个线程,我们看它里面的run()val tr = new TaskRunner(context, taskDescription, plugins)runningTasks.put(taskDescription.taskId, tr)//把这个线程丢进线程池运行threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}class TaskRunner(...)extends Runnable {override def run(): Unit = {setMDCForTask(taskName, mdcProperties)threadId = Thread.currentThread.getIdThread.currentThread.setName(threadName)val threadMXBean = ManagementFactory.getThreadMXBean//TaskMemoryManager 管理单个任务分配的内存。//此类中的大部分复杂性涉及将堆外地址编码为64位长。//在堆外模式下,内存可以直接用64位长进行寻址。//在堆上模式下,内存由基对象引用和该对象内的64位偏移量的组合来寻址。//当我们想将指向数据结构的指针存储在其他结构中时,这是一个问题,//例如哈希映射或排序缓冲区中的记录指针。即使我们决定使用128位来寻址内存,我们也不能只存储基对象的地址,因为当堆因GC而重新组织时,它不能保证保持稳定。//相反,我们使用以下方法将记录指针编码为64位长://对于堆外模式,只存储原始地址,//对于堆上模式,使用地址的高位13位存储“页码”,低位51位存储此页内的偏移量。//这些页码用于索引到MemoryManager内的“页表”数组中,以检索基本对象。//这使我们能够处理8192页。在堆上模式下,最大页面大小受到 long[]数组最大大小的限制,//使我们能够寻址8192*(2^31-1)*8字节,约为140TB的内存。val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)val deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0LThread.currentThread.setContextClassLoader(replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(s"Running $taskName")//更改Task的状态为 RUNNINGexecBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStartTimeNs: Long = 0var taskStartCpu: Long = 0//返回此JVM进程在垃圾回收中花费的总时间。startGCTime = computeTotalGcTime()var taskStarted: Boolean = falsetry {// 必须在调用updateDependencies()之前设置,以防获取依赖关系需要访问其中包含的属性(例如用于访问控制)Executor.taskDeserializationProps.set(taskDescription.properties)//如果我们从SparkContext收到一组新的文件和JAR,请下载任何缺失的依赖项。还将我们获取的任何新JAR添加到类加载器中。updateDependencies(taskDescription.addedFiles, taskDescription.addedJars, taskDescription.addedArchives)//将Task反序列化task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)task.localProperties = taskDescription.propertiestask.setTaskMemoryManager(taskMemoryManager)// 如果在反序列化之前此任务已被终止,那么让我们现在退出。否则,继续执行任务。val killReason = reasonIfKilledif (killReason.isDefined) {//抛出异常而不是返回,因为在try{}块内返回会导致抛出NonLocalReturnControl异常。NonLocalReturnControl异常将被catch块捕获,导致任务出现错误的ExceptionFailure。throw new TaskKilledException(killReason.get)}if (!isLocal) {logDebug(s"$taskName's epoch is ${task.epoch}")env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)}//一个轮询执行者指标并跟踪每个Task和每个Stage峰值的类。每个executor都保存这个类的一个实例。//poll方法轮询executor指标,根据配置,它要么在自己的线程中运行,要么由executor的心跳线程调用。//该类保留两个ConcurrentHashMap,executor的Task运行器线程与轮询线程同时访问它们(通过其方法)。//一个线程可能会更新其中一个映射,而另一个线程会读取它,因此读取线程可能无法获得最新的指标,但这没关系。我们跟踪每个Stage和每个Task的执行器指标峰值。//每个Stage的峰值以executor心跳的形式发送。这样,我们就可以在任务运行时获得指标的增量更新,如果executor死亡,我们仍然有一些指标。每个任务的峰值在任务结束时在任务结果中发送。这些对于短期任务很有用。如果在任务期间没有心跳,我们仍然会收到任务的轮询指标。metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId)taskStarted = true// 运行实际任务并测量其运行时间。taskStartTimeNs = System.nanoTime()taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException = trueval value = Utils.tryWithSafeFinally {//下面我们重点看下Task时如何运行的val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,resources = taskDescription.resources,plugins = plugins)threwException = falseres} {val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()// ........// 反序列化分为两部分:首先,我们反序列化一个包含Partition的Task对象。其次,Task.run()反序列化要运行的RDD和函数。// 使用Dropwizard指标系统公开任务指标。更新任务度量计数器// Note: TaskMetrics更新后必须收集累加器更新val accumUpdates = task.collectAccumulatorUpdates()val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)// TODO: 不要将值序列化两次val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)val serializedDirectResult = ser.serialize(directResult)val resultSize = serializedDirectResult.limit()// directSend = 直接发送回driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName. Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName. $resultSize bytes result sent to driver")serializedDirectResult}}executorSource.SUCCEEDED_TASKS.inc(1L)setTaskFinishedAndClearInterruptStatus()plugins.foreach(_.onTaskSucceeded())execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {//......} finally {runningTasks.remove(taskId)if (taskStarted) {// 这意味着任务已成功反序列化,其stageId和stageAttemptId已知,并且调用了metricsPoller.onTaskStart。metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)}}}}}

六、Task

//执行单位。Spark中有两种任务:ShuffleMapTask 和 ResultTask
//Spark Job 由一个或多个Stage 组成。作业的最后一个Stage 由多个ResultTask组成,而早期Stage由ShuffleMapTasks组成。ResultTask执行任务并将任务输出发送回驱动程序应用程序。ShuffleMapTask执行任务并将任务输出划分为多个bucket(基于任务的分区器)。
private[spark] abstract class Task[T](...) extends Serializable {final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem,resources: Map[String, ResourceInformation],plugins: Option[PluginContainer]): T = {//在SparkEnv 的 blockManager 注册任务SparkEnv.get.blockManager.registerTask(taskAttemptId)// 允许基于分区创建BarrierTaskContext,而不是基于阶段是否是屏障val taskContext = new TaskContextImpl(stageId,stageAttemptId, // stageAttemptId和stageAtteptNumber在语义上相等partitionId,taskAttemptId,attemptNumber,taskMemoryManager,localProperties,metricsSystem,metrics,resources)context = if (isBarrier) {new BarrierTaskContext(taskContext)} else {taskContext}InputFileBlockHolder.initialize()TaskContext.setTaskContext(context)taskThread = Thread.currentThread()if (_reasonIfKilled != null) {kill(interruptThread = false, _reasonIfKilled)}new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()plugins.foreach(_.onTaskStart())try {//这里会调用 ShuffleMapTask 或者 ResultTask 的 runTask()runTask(context)} catch {//......}}}

1、ShuffleMapTask

ShuffleMapTask基于ShuffleDependency中指定的分区器将RDD的元素划分为多个桶

private[spark] class ShuffleMapTask(...)extends Task[MapStatus](...){override def runTask(context: TaskContext): MapStatus = {// 使用广播变量反序列化RDD和依赖关系val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lval rdd = rddAndDep._1val dep = rddAndDep._2//当我们使用旧的shuffle fetch协议时,我们在ShuffleBlockId构造中使用partitionId作为mapId。//spark.shuffle.useOldFetchProtocol 默认 false//在执行shuffle块获取时是否使用旧协议//只有在我们需要新Spark版本作业从旧版本外部shuffle服务获取shuffle区块的兼容性时才启用val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId} else context.taskAttemptId()//使用ShuffleDependency的ShuffleWriteProcessor写数据//ShuffleDependency://    表示对shuffle阶段输出的依赖关系。请注意,在shuffle的情况下,RDD是暂时的,因为我们在executor端不需要它。//ShuffleWriteProcessor://    自定义shuffle写入过程的界面。//    driver 创建一个ShuffleWriteProcessor并将其放入[[ShuffleDependency]]中,执行器在每个ShuffleMapTask中使用它。//    此时的这个ShuffleDependency 就是 最前面划分Stage时传入的 RDD 的 dependencies//    即调用ShuffledRDD的getDependencies()//    哎,发现之前还是new 的一个新的 ShuffleWriteProcessor 下面我们看下写过程dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)}}

ShuffleWriteProcessor 

private[spark] class ShuffleWriteProcessor extends Serializable with Logging {//特定分区的写入过程,它控制着[[ShuffleWriter]]从[[ShuffleManager]]获取的生命周期,并触发rdd计算,最后返回此任务的[[MapStatus]]。def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],mapId: Long,context: TaskContext,partition: Partition): MapStatus = {var writer: ShuffleWriter[Any, Any] = nulltry {//Spark 的 Shuffle 过程时 SparkEnv 中的 ShuffleManager 统一管理的val manager = SparkEnv.get.shuffleManager//获取 ShuffleWriter//ShuffleManager 会调用 SortShuffleManager 根据情况会返回是三种Writer//    UnsafeShuffleWriter//    BypassMergeSortShuffleWriter//    SortShuffleWriter//后面单独系统分析ShuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle,mapId,context,createMetricsReporter(context))//因为整个stage的rdd依赖关系都是窄依赖,因此在进行Shuffle writer 的时候才触发迭代器嵌套计算,一个rdd调用前面的rdd并执行作用在每个rdd上的函数。writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])val mapStatus = writer.stop(success = true)if (mapStatus.isDefined) {//spark 3.2新特性 push-based shuffle机制//如果启用了基于推送的洗牌,则启动洗牌推送过程。//map任务只负责将Shuffle数据文件转换为多个块推送请求。//它将块推送到不同的线程池ShuffleBlockPusher。BLOCK_pcatheder_POOL。//我们进去dep.shuffleMergeEnabled 可以看到开启这些服务的条件//    1、提交应用程序以在YARN模式下运行//    2、已启用外部洗牌服务//    3、IO加密已禁用//    4、序列化器(如KryoSerialer)支持重新定位序列化对象//    5、RDD不能是Barrier的if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {manager.shuffleBlockResolver match {case resolver: IndexShuffleBlockResolver =>val dataFile = resolver.getDataFile(dep.shuffleId, mapId)//ShuffleBlockPusher用于在启用推送Shuffle时将Shuffle块推送到远程Shuffle服务。//当启用推送Shuffle时,它是在Shuffle写入器完成洗牌文件的写入并启动块推送过程后创建的。new ShuffleBlockPusher(SparkEnv.get.conf).initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)case _ =>}}}mapStatus.get} catch {......}}
}}

2、ResultTask

  override def runTask(context: TaskContext): U = {// 使用广播变量反序列化RDD和函数。val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L//ResultTask中会将多个RDD迭代计算后的迭代器传送给 finalRDD 上的用户指定的函数机械能处理//比如://    count() Spark会默认给你一个方法 Utils.getIteratorSize 进行统计//    take(n) Spark会默认给你一个(it: Iterator[T]) => it.take(left).toArray//    foreach(println()) iter: Iterator[T]) => iter.foreach(cleanF)//    collect() Spark会默认给你一个 iter: Iterator[T]) => iter.toArray//结果可以被driver端接收,也可以写入Hadoop文件系统//比如://    saveAsTextFile(path)  会将Iterator结果放入executeTask(...,iterator)中写入Hadoop文件系统func(context, rdd.iterator(partition, context))}    

七、总结

1、任务调度器提交Task
2、Driver端计算每个Task应该往哪个Executor移动
3、序列化Task描述信息
4、Driver向Executor发送 LaunchTask 消息
5、Executor对Task描述信息进行解码,并转交给自己的Executor对象处理任务
6、Executor对象将Task描述信息封装成TaskRunner线程交由threadPool线程池处理
7、调用Task对象的run()将任务跑起来(Task有两个重要的子类即ShuffleMapTask和ResultTask)
8、ShuffleMapTask会根据不同的场景调用不同的ShuffleWriter(3种)对这个Stage的结果进行磁盘写入(写入结果时会利用迭代器嵌套依次处理这个Stage中作用在每个RDD上的函数)。如果复合push-based shuffle机制(需要满足5个条件,上面已经提到),就会将结果推送到下一个Stage
9、一个Job经过N个ShuffleMapStage后会到达ResultStage,并启动ResultTask将迭代器嵌套结果给到触发Job的Action算子中的函数,对Driver端进行返回(可能时一个数组、统计值、或写入Hadoop文件系统)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com