欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Flink 1.14.*版本kafkaSource源码

Flink 1.14.*版本kafkaSource源码

2025/2/25 14:22:05 来源:https://blog.csdn.net/weixin_43113679/article/details/141288861  浏览:    关键词:Flink 1.14.*版本kafkaSource源码

都知道Flink主要分为三个模块,Source,各种算子(map,flatMap等),Sink三部分,
这里主要讲一下Flin自身带的连接器kafkaSource内部源码逻辑

    • 1、添加kafkaSource
    • 2、SourceFunction(各种source的顶层父类)
      • (1)FlinkKafkaConsumerBase类层级结构
      • (2) FlinkKafkaConsumerBase 内部执行逻辑
      • (3)单独启动一个消费线程把kafka服务端数据写入临时空间
      • (4)保存快照(snapshotState)和快照保存成功后的回调(notifyCheckpointComplete)

1、添加kafkaSource

Flink要增加各种source,业务层一般是通过下面的代码添加source

//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//初始化反序列化器
DeserializationSchema deserializationSchema = new  实现了(DeserializationSchema<Row[]>)接口的对象
//创建kafkaSource
SourceFunction kafkaSource = new FlinkKafkaConsumer<Row>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
//把kafkaSource添加到环境
env.addSource(SourceFunction<OUT> kafkaSource, String sourceName)
.........
//启动
env.execute()

2、SourceFunction(各种source的顶层父类)

(1)FlinkKafkaConsumerBase类层级结构

@PublicEvolving
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {..........
}
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {//省略干扰代码public void run(SourceContext<T> sourceContext) throws Exception {if (this.subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");} else {this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback = new KafkaCommitCallback() {public void onSuccess() {FlinkKafkaConsumerBase.this.successfulCommits.inc();}public void onException(Throwable cause) {FlinkKafkaConsumerBase.LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);FlinkKafkaConsumerBase.this.failedCommits.inc();}};if (this.subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info("Consumer subtask {} creating fetcher with offsets {}.", this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets);this.kafkaFetcher = this.createFetcher(sourceContext, this.subscribedPartitionsToStartOffsets, this.watermarkStrategy, (StreamingRuntimeContext)this.getRuntimeContext(), this.offsetCommitMode, this.getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"), this.useMetrics);if (this.running) {if (this.discoveryIntervalMillis == -9223372036854775808L) {this.kafkaFetcher.runFetchLoop();} else {this.runWithPartitionDiscovery();}}}}private void runWithPartitionDiscovery() throws Exception {AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference();this.createAndStartDiscoveryLoop(discoveryLoopErrorRef);this.kafkaFetcher.runFetchLoop();this.partitionDiscoverer.wakeup();this.joinDiscoveryLoopThread();Exception discoveryLoopError = (Exception)discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}//省略干扰代码
}

其中FlinkKafkaConsumer继承自父类FlinkKafkaConsumerBase
启动kafka的消费任务方法是public void run(SourceContext<T> sourceContext);

@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {private static final long serialVersionUID = 1L;public RichParallelSourceFunction() {}
}@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}public interface SourceFunction<T> extends Function, Serializable {void run(SourceFunction.SourceContext<T> var1) throws Exception;void cancel();@Publicpublic interface SourceContext<T> {void collect(T var1);@PublicEvolvingvoid collectWithTimestamp(T var1, long var2);@PublicEvolvingvoid emitWatermark(Watermark var1);@PublicEvolvingvoid markAsTemporarilyIdle();Object getCheckpointLock();void close();}
}

FlinkKafkaConsumerBaserun方法继承自上级父类SourceFunction中的run方法,即触发SourceFunction中的run方法,就是触发了FlinkKafkaConsumerBaserun方法

(2) FlinkKafkaConsumerBase 内部执行逻辑

@Internal
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {//从kafka服务端消费数据的线程final KafkaConsumerThread consumerThread;//构造函数public KafkaFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics);this.deserializer = deserializer;this.handover = new Handover();this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup);this.kafkaCollector = new KafkaFetcher.KafkaCollector();}public void runFetchLoop() throws Exception {try {//启动单独的消费线程,从kafka服务端消费数据到handoverthis.consumerThread.start();//从handover获取数据执行反序化,和while(this.running) {ConsumerRecords<byte[], byte[]> records = this.handover.pollNext();Iterator var2 = this.subscribedPartitionStates().iterator();while(var2.hasNext()) {KafkaTopicPartitionState<T, TopicPartition> partition = (KafkaTopicPartitionState)var2.next();List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle());this.partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {this.consumerThread.shutdown();}try {this.consumerThread.join();} catch (InterruptedException var8) {Thread.currentThread().interrupt();}}protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {Iterator var3 = partitionRecords.iterator();while(var3.hasNext()) {ConsumerRecord<byte[], byte[]> record = (ConsumerRecord)var3.next();//反序列化操作,deserializer这个由研发自定义的,即文章开头的DeserializationSchema deserializationSchemathis.deserializer.deserialize(record, this.kafkaCollector);this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp());if (this.kafkaCollector.isEndOfStreamSignalled()) {this.running = false;break;}}}
}@Internal
public abstract class AbstractFetcher<T, KPH> {protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) {synchronized(this.checkpointLock) {Object record;while((record = records.poll()) != null) {//获取时间戳long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);//发送到下游,即source结束,this.sourceContext.collectWithTimestamp(record, timestamp);//保存分区状态partitionState.onEvent(record, timestamp);}//提交位点到分区的状态partitionState.setOffset(offset);}}
}

这里需要注意,这个runFetchLoop方法中是从临时空间Handover取数据,不是直接从kafka服务端取数据,而从kafka服务端取数据是由consumerThread线程执行

(3)单独启动一个消费线程把kafka服务端数据写入临时空间

this.consumerThread.start()

实际上是调的下面的run方法,启动从kafka拉取数据的线程

@Internal
public class KafkaConsumerThread<T> extends Thread {public void run() {//省略干扰代码if (this.running) {ConsumerRecords records = null;//省略干扰代码while(true) {while(true) {//省略干扰代码try {records = this.consumer.poll(this.pollTimeout);break;} catch (WakeupException var21) {} }//把数据放入到handover中try {handover.produce(records);records = null;} catch (org.apache.flink.streaming.connectors.kafka.internals.Handover.WakeupException var18) {}}      }
}

这里是把kafka服务端拉取的数据放入到handover

(4)保存快照(snapshotState)和快照保存成功后的回调(notifyCheckpointComplete)

FlinkKafkaConsumerBase中有两个方法,

  1. 一个是snapshotState(实现的是CheckpointedFunction接口),保存快照触发
  2. 一个是notifyCheckpointComplete(实现的是CheckpointListener接口) 保存成功后的回调
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!this.running) {LOG.debug("snapshotState() called on closed source");} else {this.unionOffsetStates.clear();AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher != null) {//获取当前消费的各个分区,已经提交到partitionState的位点,放入到pendingOffsetsToCommit中HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {this.pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}//忽略干扰代码} //忽略干扰代码}}public final void notifyCheckpointComplete(long checkpointId) throws Exception {if (!this.running) {LOG.debug("notifyCheckpointComplete() called on closed source");} else {AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {LOG.debug("notifyCheckpointComplete() called on uninitialized source");} else {if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.", this.getRuntimeContext().getIndexOfThisSubtask(), checkpointId);}try {int posInMap = this.pendingOffsetsToCommit.indexOf(checkpointId);//忽略干扰代码//往kafka服务端提交位点fetcher.commitInternalOffsetsToKafka(offsets, this.offsetCommitCallback);} catch (Exception var7) {if (this.running) {throw var7;}}}}}}}

snapshotState方法中,获取分区的各个位点,其中各个分区的位点是由上面的emitRecordsWithTimestamps方法set进去的,

public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {assert Thread.holdsLock(this.checkpointLock);HashMap<KafkaTopicPartition, Long> state = new HashMap(this.subscribedPartitionStates.size());Iterator var2 = this.subscribedPartitionStates.iterator();while(var2.hasNext()) {KafkaTopicPartitionState<T, KPH> partition = (KafkaTopicPartitionState)var2.next();//partition.getOffset()就是上面partitionState.setOffset塞的数据state.put(partition.getKafkaTopicPartition(), partition.getOffset());}return state;
}

所以,kafka消费下来的数据,当checkpoint保存后,触发回调notifyCheckpointComplete,才会提交kafka位点

通过kafkaSource,就能知道其他的source,基本逻辑是一样的

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词