欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > Flink 1.14.*中flatMap,filter等基本转换函数源码

Flink 1.14.*中flatMap,filter等基本转换函数源码

2024/11/30 12:45:25 来源:https://blog.csdn.net/weixin_43113679/article/details/141690402  浏览:    关键词:Flink 1.14.*中flatMap,filter等基本转换函数源码

这里以flatMap,filter为例,介绍Flink如果要实现这些基本转换需要实现哪些接口,Flink运行时调用这些实现类的入口,这些基本转换函数之间的类关系

  • 一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
    • 1、RichFlatMapFunction
    • 2、RichFilterFunction
  • 二、Flink把实现了flatMap,filter功能的类加入到作业中
  • 三、Flink运行时如何调用flatMap和filter的实现类的
  • 四、类关系图

一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口

1、RichFlatMapFunction

@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {private static final long serialVersionUID = 1L;public RichFlatMapFunction() {}//需要实现下面这个方法public abstract void flatMap(IN var1, Collector<OUT> var2) throws Exception;
}

只需要实现类继承了RichFlatMapFunction,实现了flatMap方法就可以

2、RichFilterFunction

@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {private static final long serialVersionUID = 1L;public RichFilterFunction() {}//需要实现下面这个类public abstract boolean filter(T var1) throws Exception;
}

只需要实现类继承了RichFilterFunction,实现了filter方法就可以

二、Flink把实现了flatMap,filter功能的类加入到作业中

一般是通过如下代码

DataStream<Row>  dateStream = 来自source的数据流
dateStream.flatMap(extend RichFlatMapFunction的子类);
dateStream.filter(extend RichFilterFunction的子类);

三、Flink运行时如何调用flatMap和filter的实现类的

那就看一下dateStream.flatMap方法

@Public
public class DataStream<T> {protected final Transformation<T> transformation;public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);return this.flatMap(flatMapper, outType);}public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));}
}

StreamFlatMap构造时会把实现类当成入参构建OneInputStreamOperator

@Internal
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;private transient TimestampedCollector<OUT> collector;public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {super(flatMapper);this.chainingStrategy = ChainingStrategy.ALWAYS;}public void open() throws Exception {super.open();this.collector = new TimestampedCollector(this.output);}public void processElement(StreamRecord<IN> element) throws Exception {this.collector.setTimestamp(element);//这里就是调用的父类的userFunction,即构造函数传入的flatMapper((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);}
}

下面会把userFunction赋值给AbstractUdfStreamOperator的字段,这样子类在调用userFunction时就是调用的这个

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {private static final long serialVersionUID = 1L;protected final F userFunction;public AbstractUdfStreamOperator(F userFunction) {this.userFunction = (Function)Objects.requireNonNull(userFunction);this.checkUdfCheckpointingPreconditions();}
}

这样StreamFlatMapuserFunction的操作,就是对实现了RichFlatMapFunction的子类的操作

像filter也类似,如下

@Internal
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {private static final long serialVersionUID = 1L;public StreamFilter(FilterFunction<IN> filterFunction) {super(filterFunction);this.chainingStrategy = ChainingStrategy.ALWAYS;}public void processElement(StreamRecord<IN> element) throws Exception {if (((FilterFunction)this.userFunction).filter(element.getValue())) {this.output.collect(element);}}
}

StreamFilterStreamFlatMap都是继承了AbstractUdfStreamOperator 实现了OneInputStreamOperator接口,
你可以理解StreamFilterStreamFlatMap有共同的父类和接口,

四、类关系图

RichFlatMapFunction
在这里插入图片描述
RichFilterFunction
在这里插入图片描述

通过上面两张图就知道RichFlatMapFunctionRichFilterFunction都是相同的父类扩展下来的

StreamFlatMap
在这里插入图片描述
StreamFilter
在这里插入图片描述
通过上面的也清楚,StreamFlatMapStreamFilter都是相同的父类和接口,只是processElement方法的实现不一样

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com