都知道Flink主要分为三个模块,Source,各种算子(map,flatMap等),Sink三部分,
这里主要讲一下Flin自身带的连接器kafkaSource内部源码逻辑
- 1、添加kafkaSource
- 2、SourceFunction(各种source的顶层父类)
- (1)FlinkKafkaConsumerBase类层级结构
- (2) FlinkKafkaConsumerBase 内部执行逻辑
- (3)单独启动一个消费线程把kafka服务端数据写入临时空间
- (4)保存快照(snapshotState)和快照保存成功后的回调(notifyCheckpointComplete)
1、添加kafkaSource
Flink
要增加各种source
,业务层一般是通过下面的代码添加source
//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//初始化反序列化器
DeserializationSchema deserializationSchema = new 实现了(DeserializationSchema<Row[]>)接口的对象
//创建kafkaSource
SourceFunction kafkaSource = new FlinkKafkaConsumer<Row>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
//把kafkaSource添加到环境
env.addSource(SourceFunction<OUT> kafkaSource, String sourceName)
.........
//启动
env.execute()
2、SourceFunction(各种source的顶层父类)
(1)FlinkKafkaConsumerBase类层级结构
@PublicEvolving
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {..........
}
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {//省略干扰代码public void run(SourceContext<T> sourceContext) throws Exception {if (this.subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");} else {this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback = new KafkaCommitCallback() {public void onSuccess() {FlinkKafkaConsumerBase.this.successfulCommits.inc();}public void onException(Throwable cause) {FlinkKafkaConsumerBase.LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);FlinkKafkaConsumerBase.this.failedCommits.inc();}};if (this.subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info("Consumer subtask {} creating fetcher with offsets {}.", this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets);this.kafkaFetcher = this.createFetcher(sourceContext, this.subscribedPartitionsToStartOffsets, this.watermarkStrategy, (StreamingRuntimeContext)this.getRuntimeContext(), this.offsetCommitMode, this.getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"), this.useMetrics);if (this.running) {if (this.discoveryIntervalMillis == -9223372036854775808L) {this.kafkaFetcher.runFetchLoop();} else {this.runWithPartitionDiscovery();}}}}private void runWithPartitionDiscovery() throws Exception {AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference();this.createAndStartDiscoveryLoop(discoveryLoopErrorRef);this.kafkaFetcher.runFetchLoop();this.partitionDiscoverer.wakeup();this.joinDiscoveryLoopThread();Exception discoveryLoopError = (Exception)discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}//省略干扰代码
}
其中FlinkKafkaConsumer
继承自父类FlinkKafkaConsumerBase
启动kafka
的消费任务方法是public void run(SourceContext<T> sourceContext);
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {private static final long serialVersionUID = 1L;public RichParallelSourceFunction() {}
}@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}public interface SourceFunction<T> extends Function, Serializable {void run(SourceFunction.SourceContext<T> var1) throws Exception;void cancel();@Publicpublic interface SourceContext<T> {void collect(T var1);@PublicEvolvingvoid collectWithTimestamp(T var1, long var2);@PublicEvolvingvoid emitWatermark(Watermark var1);@PublicEvolvingvoid markAsTemporarilyIdle();Object getCheckpointLock();void close();}
}
FlinkKafkaConsumerBase
的run
方法继承自上级父类SourceFunction
中的run
方法,即触发SourceFunction
中的run
方法,就是触发了FlinkKafkaConsumerBase
的run
方法
(2) FlinkKafkaConsumerBase 内部执行逻辑
@Internal
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {//从kafka服务端消费数据的线程final KafkaConsumerThread consumerThread;//构造函数public KafkaFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics);this.deserializer = deserializer;this.handover = new Handover();this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup);this.kafkaCollector = new KafkaFetcher.KafkaCollector();}public void runFetchLoop() throws Exception {try {//启动单独的消费线程,从kafka服务端消费数据到handoverthis.consumerThread.start();//从handover获取数据执行反序化,和while(this.running) {ConsumerRecords<byte[], byte[]> records = this.handover.pollNext();Iterator var2 = this.subscribedPartitionStates().iterator();while(var2.hasNext()) {KafkaTopicPartitionState<T, TopicPartition> partition = (KafkaTopicPartitionState)var2.next();List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle());this.partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {this.consumerThread.shutdown();}try {this.consumerThread.join();} catch (InterruptedException var8) {Thread.currentThread().interrupt();}}protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {Iterator var3 = partitionRecords.iterator();while(var3.hasNext()) {ConsumerRecord<byte[], byte[]> record = (ConsumerRecord)var3.next();//反序列化操作,deserializer这个由研发自定义的,即文章开头的DeserializationSchema deserializationSchemathis.deserializer.deserialize(record, this.kafkaCollector);this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp());if (this.kafkaCollector.isEndOfStreamSignalled()) {this.running = false;break;}}}
}@Internal
public abstract class AbstractFetcher<T, KPH> {protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) {synchronized(this.checkpointLock) {Object record;while((record = records.poll()) != null) {//获取时间戳long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);//发送到下游,即source结束,this.sourceContext.collectWithTimestamp(record, timestamp);//保存分区状态partitionState.onEvent(record, timestamp);}//提交位点到分区的状态partitionState.setOffset(offset);}}
}
这里需要注意,这个runFetchLoop
方法中是从临时空间Handover
取数据,不是直接从kafka
服务端取数据,而从kafka
服务端取数据是由consumerThread
线程执行
(3)单独启动一个消费线程把kafka服务端数据写入临时空间
this.consumerThread.start()
实际上是调的下面的run方法,启动从kafka拉取数据的线程
@Internal
public class KafkaConsumerThread<T> extends Thread {public void run() {//省略干扰代码if (this.running) {ConsumerRecords records = null;//省略干扰代码while(true) {while(true) {//省略干扰代码try {records = this.consumer.poll(this.pollTimeout);break;} catch (WakeupException var21) {} }//把数据放入到handover中try {handover.produce(records);records = null;} catch (org.apache.flink.streaming.connectors.kafka.internals.Handover.WakeupException var18) {}} }
}
这里是把kafka
服务端拉取的数据放入到handover
,
(4)保存快照(snapshotState)和快照保存成功后的回调(notifyCheckpointComplete)
在FlinkKafkaConsumerBase
中有两个方法,
- 一个是
snapshotState
(实现的是CheckpointedFunction
接口),保存快照触发 - 一个是
notifyCheckpointComplete
(实现的是CheckpointListener
接口) 保存成功后的回调
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!this.running) {LOG.debug("snapshotState() called on closed source");} else {this.unionOffsetStates.clear();AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher != null) {//获取当前消费的各个分区,已经提交到partitionState的位点,放入到pendingOffsetsToCommit中HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {this.pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}//忽略干扰代码} //忽略干扰代码}}public final void notifyCheckpointComplete(long checkpointId) throws Exception {if (!this.running) {LOG.debug("notifyCheckpointComplete() called on closed source");} else {AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {LOG.debug("notifyCheckpointComplete() called on uninitialized source");} else {if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.", this.getRuntimeContext().getIndexOfThisSubtask(), checkpointId);}try {int posInMap = this.pendingOffsetsToCommit.indexOf(checkpointId);//忽略干扰代码//往kafka服务端提交位点fetcher.commitInternalOffsetsToKafka(offsets, this.offsetCommitCallback);} catch (Exception var7) {if (this.running) {throw var7;}}}}}}}
snapshotState
方法中,获取分区的各个位点,其中各个分区的位点是由上面的emitRecordsWithTimestamps
方法set进去的,
public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {assert Thread.holdsLock(this.checkpointLock);HashMap<KafkaTopicPartition, Long> state = new HashMap(this.subscribedPartitionStates.size());Iterator var2 = this.subscribedPartitionStates.iterator();while(var2.hasNext()) {KafkaTopicPartitionState<T, KPH> partition = (KafkaTopicPartitionState)var2.next();//partition.getOffset()就是上面partitionState.setOffset塞的数据state.put(partition.getKafkaTopicPartition(), partition.getOffset());}return state;
}
所以,kafka
消费下来的数据,当checkpoint
保存后,触发回调notifyCheckpointComplete
,才会提交kafka
位点
通过kafkaSource
,就能知道其他的source
,基本逻辑是一样的