欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Flink推测机制

Flink推测机制

2024/10/24 2:02:43 来源:https://blog.csdn.net/blackjjcat/article/details/138605327  浏览:    关键词:Flink推测机制

1、配置

    execution.batch.speculative.enabled:false,推测机制开关,必须在AdaptiveBatchScheduler模式下使用

    execution.batch.speculative.max-concurrent-executions:2,同时最多几次执行

    execution.batch.speculative.block-slow-node-duration:1分钟,慢速节点会如黑名单,控制在黑名单中的时长

    slow-task-detector.check-interval:1秒,慢任务检查间隔

    slow-task-detector.execution-time.baseline-lower-bound:1分钟,慢任务检测基线的下限

    slow-task-detector.execution-time.baseline-ratio:0.75,开始检测慢任务基线的任务完成率,即有75%任务完成后,开始计算剩下的任务是否为慢任务

    slow-task-detector.execution-time.baseline-multiplier:1.5,慢任务基线乘数

2、SpeculativeScheduler

    推测机制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory当中,创建调度器时,如果开启了推测机制,会创建SpeculativeScheduler

if (enableSpeculativeExecution) {return new SpeculativeScheduler(log,jobGraph,ioExecutor,jobMasterConfiguration,

2.1、启动

    调度器启动时有三个操作:1、注册指标;2、父类通用的启动流程,会有算子的一些初始化;3、启动慢任务检测任务

protected void startSchedulingInternal() {registerMetrics(jobManagerJobMetricGroup);super.startSchedulingInternal();slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
}

2.2、SlowTaskDetector

    SlowTaskDetector负责检测慢任务,实现类是ExecutionTimeBasedSlowTaskDetector,基于schedule进行检测

this.scheduledDetectionFuture =mainThreadExecutor.schedule(() -> {listener.notifySlowTasks(findSlowTasks(executionGraph));scheduleTask(executionGraph, listener, mainThreadExecutor);},checkIntervalMillis,TimeUnit.MILLISECONDS);

    核心是findSlowTasks,首先是获取需要校验的拓扑集

private List<ExecutionJobVertex> getJobVerticesToCheck(final ExecutionGraph executionGraph) {return IterableUtils.toStream(executionGraph.getVerticesTopologically()).filter(ExecutionJobVertex::isInitialized).filter(ejv -> ejv.getAggregateState() != ExecutionState.FINISHED).filter(ejv -> getFinishedRatio(ejv) >= baselineRatio).collect(Collectors.toList());
}

    getFinishedRatio就是获取完成任务数超过基线比率的,就是拓扑集中完成任务数和总任务数的比值

private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) {checkState(executionJobVertex.getTaskVertices().length > 0);long finishedCount =Arrays.stream(executionJobVertex.getTaskVertices()).filter(ev -> ev.getExecutionState() == ExecutionState.FINISHED).count();return (double) finishedCount / executionJobVertex.getTaskVertices().length;
}

    接下来是获取基线和在基线基础上计算慢速任务的,接口是getBaseline和findExecutionsExceedingBaseline,本质就是执行时间和基线的对比,注意这里不仅用到了时间,还用到了输入字节数,所以慢任务的检测可能是基于吞吐来的

private ExecutionTimeWithInputBytes getBaseline(final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) {final ExecutionTimeWithInputBytes weightedExecutionTimeMedian =calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis);long multipliedBaseline =(long) (weightedExecutionTimeMedian.getExecutionTime() * baselineMultiplier);return new ExecutionTimeWithInputBytes(multipliedBaseline, weightedExecutionTimeMedian.getInputBytes());
}return Double.compare((double) executionTime / Math.max(inputBytes, Double.MIN_VALUE),(double) other.getExecutionTime()/ Math.max(other.getInputBytes(), Double.MIN_VALUE));

2.3、notifySlowTasks

    获取慢速任务以后,SlowTaskDetector会触发监听器,监听器的处理实现在SpeculativeScheduler的notifySlowTasks接口

    首先把节点加入黑名单

// add slow nodes to blocklist before scheduling new speculative executions
blockSlowNodes(slowTasks, currentTimestamp);

    这边会检测任务是否支持推测,默认是支持

if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {continue;
}

    基于时间戳,对慢任务新建Execution

final Collection<Execution> attempts =IntStream.range(0, newSpeculativeExecutionsToDeploy).mapToObj(i ->executionVertex.createNewSpeculativeExecution(currentTimestamp)).collect(Collectors.toList());

    之后会进行一系列的配置,加入监控

setupSubtaskGatewayForAttempts(executionVertex, attempts);
verticesToDeploy.add(executionVertexId);
newSpeculativeExecutions.addAll(attempts);

    最后发起调度

executionDeployer.allocateSlotsAndDeploy(newSpeculativeExecutions,executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));

3、任务结束

    任务结束主要核心在DefaultExecutionGraph的jobFinished,判断在上层ExecutionJobVertex.executionVertexFinished,这里是通过任务并行度来判断的,所有子任务完成则认为job完成

void executionVertexFinished() {checkState(isInitialized());numExecutionVertexFinished++;if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {getGraph().jobVertexFinished();}
}

    这个的调用是由Execution触发的,也就是每个子任务完成会去调用一次

if (transitionState(current, FINISHED)) {try {finishPartitionsAndUpdateConsumers();updateAccumulatorsAndMetrics(userAccumulators, metrics);releaseAssignedResource(null);vertex.getExecutionGraphAccessor().deregisterExecution(this);} finally {vertex.executionFinished(this);}return;
}

    最终一个jobVertex(对应Job的一个任务,任务根据并行度有子任务)完成的时候会通知所有子任务完成

public void jobVertexFinished() {assertRunningInJobMasterMainThread();final int numFinished = ++numFinishedJobVertices;if (numFinished == numJobVerticesTotal) {FutureUtils.assertNoException(waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished()));}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com