欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 【Flink源码分析】6. Flink1.19源码分析-Flink底层的异步通信

【Flink源码分析】6. Flink1.19源码分析-Flink底层的异步通信

2025/2/12 10:31:10 来源:https://blog.csdn.net/ayt007/article/details/145551235  浏览:    关键词:【Flink源码分析】6. Flink1.19源码分析-Flink底层的异步通信

6.1 PekkoInvocationHandler 类

仅摘取了 Flink RPC 进行通信的时候一段代码,也是异步通信的典型代码。


// execute an asynchronous call
final CompletableFuture<?> resultFuture =//1. ask 发起rpc调用的方法,它返回一个 CompletableFuture ,表示rpc调用的异步结果ask(rpcInvocation, futureTimeout)/*** 2. thenApply:ask 任务执行完成后,执行thenApply回调方法任务,将ask执行结果(resultValue)作为入参,* 传递到 thenApply 对应的任务中* thenApply 是有返回值的*/.thenApply(resultValue ->// 3. deserializeValueIfNeeded 对返回的结果进行反序列化deserializeValueIfNeeded(resultValue, method, flinkClassLoader));
//4. 这里创建了一个新的 CompletableFuture    对象,用于封装 resultFuture 的完成结果或异常
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
/*** 5. 任务执行完成后,无论任务是否抛出异常都会调用回调 whenComplete ,接收两个参数*  resultValue : thenApply序列化后的值*  failure : 执行过程中的异常信息,如果没有,则为null*/
resultFuture.whenComplete((resultValue, failure) -> {// 6. 如果异常信息不为 nullif (failure != null) {// 7. 使用 completeExceptionally 来标记 Future 为完成状态 并且是异常状态。这样在调用get()方法时就会抛出异常completableFuture.completeExceptionally(resolveTimeoutException(ExceptionUtils.stripCompletionException(failure),callStackCapture,address,rpcInvocation));} else {// 8. complete方法是用来将CompletableFuture对象标记为已完成状态,并指定完成时的结果。completableFuture.complete(resultValue);}});if (Objects.equals(returnType, CompletableFuture.class)) {result = completableFuture;
} else {try {/*** 9. get(long timeout, TimeUnit unit) 设置一个超时时间* 阻塞获取结果如果超过设置的时间则报错*/result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);} catch (ExecutionException ee) {throw new RpcException("Failure while obtaining synchronous RPC result.",ExceptionUtils.stripExecutionException(ee));}
}

6.2 PekkoRpcService 类

TaskExecutor向ResourceManager注册时候的一段代码。


private <C extends RpcGateway> CompletableFuture<C> connectInternal(final String address,final Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {checkState(!stopped, "RpcService is stopped");LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",address,clazz.getName());/*** 获取 ActorRef 地址*/final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);/***  1. actorRefFuture.thenCompose : 当 actorRefFuture 完成(即成功解析 Actor 地址)时,执行内部的 lambda 表达式*  2. Patterns.ask : 向解析得到的ActorRef发送一个 RemoteHandshakeMessage ,该消息包含目标类(clazz)和版本号(getVersion())。*  发送请求时还指定了一个超时时间。*  3. ScalaFutureUtils.toJava : 将Scala Future 转换为Java CompletableFuture*  4. <HandshakeSuccessMessage>mapTo : 将返回的响应映射为 HandshakeSuccessMessage 类型*  直白一点说就是进行了一次握手*  进行一次Handshake握手,需要保证RpcEndpoint节点正常**  定义了一个CompletableFuture对象handshakeFuture,其泛型类型为HandshakeSuccessMessage*  这意味着这个Future对象将在握手成功后返回一个HandshakeSuccessMessage对象*/final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =/*** thenCompose方法用于组合两个异步操作:首先等待actorRefFuture完成,然后使用其结果(即ActorRef)来执行另一个异步操作。* 返回CompletableFuture的函数,以构建更复杂的异步操作链。*/actorRefFuture.thenCompose((ActorRef actorRef) ->// 将 Pekko 中 ask 返回值 future 转换为 Java 中的 CompletableFutureScalaFutureUtils.toJava(// ask 发送异步消息 等待相应结果  相应结果为 futurePatterns.ask(actorRef,new RemoteHandshakeMessage(clazz, getVersion()),configuration.getTimeout().toMillis()).<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));/***  actorRefFuture.thenCombineAsync(handshakeFuture,...,actorSystem.dispatcher())*  当 actorRefFuture 和 handshakeFuture 都完成时,他们的结果(分别是 ActorRef 和 HandshakeSuccessMessage )*  将被传递给提供的 lambda 表达式进行进一步处理。这个异步操作使用 actorSystem.dispatcher() 作为调度器,这意味着它将在一个专用的线程池中执行。*  这通常是 Akka 的默认调度器,它具有适当的配置以实现高吞吐量。*  1. 创建一个 InvocationHandler 。 这个处理器将用于处理代理对象的方法调用,井将这些调用转发到远程的 Actor.*  2. 获取当前类的 ClassLoader ,这样做的原因是,在某些情况下(例如 Flink 嵌入在其他应用中时), Flink 需要使用其他 ClassLoader 来加载远程 Actor 的类。*  使用当前类的 ClassLoader 可以确保代理对象也使用相同的 ClassLoader 加载,从而避免类加载器相关的问题。*  3. 使用 Proxy.newProxyInstance 方法创建一个新的代理对象。这个代理对象实现了 clazz 指定的接口,并使用了 InvocationHandler 作为处理器。*  这个代理对象会拦截接口方法的调用,并将他们委托给 InvocationHandler 处理。*/final CompletableFuture<C> gatewayFuture =/*** thenCombineAsync方法用于异步组合两个CompletableFuture对象:actorRefFuture和handshakeFuture。* 这意味着只有当这两个CompletableFuture对象都成功完成时,才会执行提供的lambda表达式。* 这里使用了actorSystem.dispatcher()作为执行lambda的调度器。*/actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);// Rather than using the System ClassLoader directly, we derive the// ClassLoader from this class.// That works better in cases where Flink runs embedded and// all Flink code is loaded dynamically// (for example from an OSGI bundle) through a custom ClassLoaderClassLoader classLoader = getClass().getClassLoader();// 调用 Proxy.newProxyInstance 静态方法创建动态代理类@SuppressWarnings("unchecked")C proxy =(C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}/*** 私有方法 resolveActorAddress,它接收一个字符串类型的地址作为参数,* 并返回一个 CompletableFuture 对象,* 该对象包含了 ActorRef 类型的结果。* @param address* @return*/
private CompletableFuture<ActorRef> resolveActorAddress(String address) {// 使用ActorSystem的actorSelection方法,根据给定的地址获取一个ActorSelection对象。final ActorSelection actorSel = actorSystem.actorSelection(address);// 使用ActorSelection的resolveOne方法,尝试解析并连接到单个Actorreturn actorSel.resolveOne(configuration.getTimeout())// 将 resolveOne 方法的返回结果(scala的future或其他异步结果)转换为java的CompletableFuture对象.toCompletableFuture()/*** 使用 exceptionally 方法处理可能发生的异常。如果resolveOne方法或其转换过程中发生异常,* 这里会捕获该异常,并抛出一个新的 CompletionException*/.exceptionally(error -> {throw new CompletionException(new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.",address),error));});
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com