这里以flatMap,filter为例,介绍Flink如果要实现这些基本转换需要实现哪些接口,Flink运行时调用这些实现类的入口,这些基本转换函数之间的类关系
- 一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
- 1、RichFlatMapFunction
- 2、RichFilterFunction
- 二、Flink把实现了flatMap,filter功能的类加入到作业中
- 三、Flink运行时如何调用flatMap和filter的实现类的
- 四、类关系图
一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
1、RichFlatMapFunction
@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {private static final long serialVersionUID = 1L;public RichFlatMapFunction() {}//需要实现下面这个方法public abstract void flatMap(IN var1, Collector<OUT> var2) throws Exception;
}
只需要实现类继承了RichFlatMapFunction
,实现了flatMap
方法就可以
2、RichFilterFunction
@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {private static final long serialVersionUID = 1L;public RichFilterFunction() {}//需要实现下面这个类public abstract boolean filter(T var1) throws Exception;
}
只需要实现类继承了RichFilterFunction
,实现了filter
方法就可以
二、Flink把实现了flatMap,filter功能的类加入到作业中
一般是通过如下代码
DataStream<Row> dateStream = 来自source的数据流
dateStream.flatMap(extend RichFlatMapFunction的子类);
dateStream.filter(extend RichFilterFunction的子类);
三、Flink运行时如何调用flatMap和filter的实现类的
那就看一下dateStream.flatMap
方法
@Public
public class DataStream<T> {protected final Transformation<T> transformation;public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);return this.flatMap(flatMapper, outType);}public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));}
}
StreamFlatMap
构造时会把实现类当成入参构建OneInputStreamOperator
@Internal
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;private transient TimestampedCollector<OUT> collector;public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {super(flatMapper);this.chainingStrategy = ChainingStrategy.ALWAYS;}public void open() throws Exception {super.open();this.collector = new TimestampedCollector(this.output);}public void processElement(StreamRecord<IN> element) throws Exception {this.collector.setTimestamp(element);//这里就是调用的父类的userFunction,即构造函数传入的flatMapper((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);}
}
下面会把userFunction
赋值给AbstractUdfStreamOperator
的字段,这样子类在调用userFunction
时就是调用的这个
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {private static final long serialVersionUID = 1L;protected final F userFunction;public AbstractUdfStreamOperator(F userFunction) {this.userFunction = (Function)Objects.requireNonNull(userFunction);this.checkUdfCheckpointingPreconditions();}
}
这样StreamFlatMap
对userFunction
的操作,就是对实现了RichFlatMapFunction
的子类的操作
像filter也类似,如下
@Internal
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {private static final long serialVersionUID = 1L;public StreamFilter(FilterFunction<IN> filterFunction) {super(filterFunction);this.chainingStrategy = ChainingStrategy.ALWAYS;}public void processElement(StreamRecord<IN> element) throws Exception {if (((FilterFunction)this.userFunction).filter(element.getValue())) {this.output.collect(element);}}
}
StreamFilter
和StreamFlatMap
都是继承了AbstractUdfStreamOperator
实现了OneInputStreamOperator
接口,
你可以理解StreamFilter
和StreamFlatMap
有共同的父类和接口,
四、类关系图
RichFlatMapFunction
RichFilterFunction
通过上面两张图就知道RichFlatMapFunction
和RichFilterFunction
都是相同的父类扩展下来的
StreamFlatMap
StreamFilter
通过上面的也清楚,StreamFlatMap
和StreamFilter
都是相同的父类和接口,只是processElement
方法的实现不一样