一、上下文
《Kafka-Consumer理论知识》中队Kafka-Consumer的理论知识进行了阐述。理解了它的基本思想,我们下面进入源码环节看看。
二、初始化KafkaConsumer
private final static ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {// 初始化 consumer 时会创建一个 delegate 代表delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);}
ConsumerDelegateCreator实现了一个准工厂模式,这里需要参考一个配置:group.protocol
它的取值有classic、consumer,默认值为classic。
当group.protocol = classic ,则返回LegacyKafkaConsumer
当group.protocol = consumer,则返回AsyncKafkaConsumer
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
//如果不设置group.protocol,则使用默认值:classic,默认返回 LegacyKafkaConsumer
if (groupProtocol == GroupProtocol.CONSUMER)return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
elsereturn new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
三、调用poll拉取数据
默认使用的consumer是LegacyKafkaConsumer,下面我们看下它的poll()
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {// 确保消费者没有关闭acquireAndEnsureOpen();try {// 度量系统开始记录this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {//consumer未订阅任何topic或分配任何partitionthrow new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}//一个循环,我们看下满足什么调节会一直循环 timer.notExpired()// Expire 到期 如果没有超时将一直循环do {// Trigger 触发// Wakeup 醒醒//client 是 ConsumerNetworkClient// 目的是唤醒... 可能是 网络连接client.maybeTriggerWakeup();// 上面调用时 给的 trueif (includeMetadataInTimeout) {// 尝试更新分配元数据,但不需要在加入组的计时器上阻塞updateAssignmentMetadataIfNeeded(timer, false);} else {while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {// 仍然等待元数据log.warn("Still waiting for metadata");}}// 核心方法,看看怎么拉取的数据final Fetch<K, V> fetch = pollForFetches(timer);if (!fetch.isEmpty()) {// 在返回提取的记录之前,我们可以发送下一轮提取,避免在用户处理提取的记录时等待其响应以启用流水线。// ***难道是变相的流水线技术吗?一次启动背后其实是两次 拉取数据,***// 注意:由于消耗的位置已经更新,在返回提取的记录之前,我们不能允许触发唤醒或任何其他错误。if (sendFetches() > 0 || client.hasPendingRequests()) {client.transmitSends();}if (fetch.records().isEmpty()) {// 从`poll()`返回空记录,因为消费者的位置已经前进了至少一个主题分区log.trace("Returning empty records from `poll()` "+ "since the consumer's position has advanced for at least one topic partition");}// Map<TopicPartition, List<ConsumerRecord<K, V>>> records// 返回的是 每个 TopicPartition 以及对应拉取的数据listreturn this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));}} while (timer.notExpired());return ConsumerRecords.empty();} finally {release();this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());}}
在拉取数据之前必须等待元数据的更新,因为有了最新的元数据,才知道去哪个节点拉取数据。拉取数据调用的是pollForFetches(timer) 下面我们继续往下看(方法上看到Fetche一般就是采用异步的方式去拉取数据了。)
四、pollForFetches()异步拉取数据
private final Fetcher<K, V> fetcher;private Fetch<K, V> pollForFetches(Timer timer) {// 拉取数据超时时间// coordinator 协调员long pollTimeout = coordinator == null ? timer.remainingMs() :Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());// if data is available already, return it immediately// 如果数据已经可用,请立即返回// max.partition.fetch.bytes 是拉取每个分区的数据量// max.poll.records 是consumer一次拉取的数据量,// 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉// 上面猜想的缓存就是 FetchBufferfinal Fetch<K, V> fetch = fetcher.collectFetch();if (!fetch.isEmpty()) {return fetch;}// 发送任何新的提取(不会重新发送待处理的提取)// 重点看下这里sendFetches();// 如果我们错过了一些位置,我们不想在poll()中被阻塞,因为偏移查找可能会在失败后退缩// 注意:使用cachedSubscriptionHasAllFetchPositions意味着我们必须在此方法之前调用updateAssignmentMetadataIfNeed。if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// 轮询超时获取log.trace("Polling for fetches with timeout {}", pollTimeout);Timer pollTimer = time.timer(pollTimeout);client.poll(pollTimer, () -> {// 由于获取可能由后台线程完成,我们需要这个轮询条件来确保我们不会在poll()中不必要地阻塞// 后台线程去拉取数据的return !fetcher.hasAvailableFetches();});timer.update(pollTimer.currentTimeMs());return fetcher.collectFetch();}
可以看到,第一次获取是先去 Fetcher<K, V> fetcher 中获取,如果没有获取到才触发数据的拉取操作:sendFetches(),然后在从Fetcher<K, V> fetcher 中获取。
我们看下拉取数据的代码sendFetches()
1、sendFetches()异步拉取数据
private int sendFetches() {offsetFetcher.validatePositionsOnMetadataChange();return fetcher.sendFetches();}
1、检测元数据是否变化
再拉取数据前必须进行元数据的再次校验,以确保拉取到最新的数据。比如leader的位置发生了变化,
public void validatePositionsOnMetadataChange() {offsetFetcherUtils.validatePositionsOnMetadataChange();}
void validatePositionsOnMetadataChange() {//获取元数据的最新版本int newMetadataUpdateVersion = metadata.updateVersion();//比较手中的元数据版本和最新的版本是否一致,如果一致就不需要调整if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {//如果不一致,需要对每个topicPartition 进行校验subscriptionState.assignedPartitions().forEach(topicPartition -> {ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);});}}
2、拉取数据
public synchronized int sendFetches() {// 一个Node 一个拉取请求final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests();// Internal 内部的// sendFetchesInternal 方法有三个参数 ,fetchRequests 、处理成功的方法、处理失败的方法// 和 scala 的语法越来越像了sendFetchesInternal(fetchRequests,(fetchTarget, data, clientResponse) -> {synchronized (Fetcher.this) {//看看成功获取到数据的处理handleFetchSuccess(fetchTarget, data, clientResponse);}},(fetchTarget, data, error) -> {synchronized (Fetcher.this) {handleFetchFailure(fetchTarget, data, error);}});return fetchRequests.size();}
我们先来看看prepareFetchRequests()中队目标节点的选择和队request的封装
封装FetchRequest
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {// Update metrics in case there was an assignment changemetricsManager.maybeUpdateAssignment(subscriptions);//下面会填充这个 map : fetchableMap<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();long currentTimeMs = time.milliseconds();Map<String, Uuid> topicIds = metadata.topicIds();// 返回了没有在缓存中的 Set<TopicPartition>for (TopicPartition partition : fetchablePartitions()) {// 该分区最后消费位置SubscriptionState.FetchPosition position = subscriptions.position(partition);if (position == null)throw new IllegalStateException("Missing position for fetchable partition " + partition);Optional<Node> leaderOpt = position.currentLeader.leader;if (!leaderOpt.isPresent()) {log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);metadata.requestUpdate(false);continue;}// 如果已设置,则使用首选的读取副本,否则使用分区的 leader// 也就是可以设置 你指定的 副本节点,如果没有指定 就从 leader 拉取// 满足 3个条件,会走副本节点拉取数据,否则返回的就是leader节点Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);// 检查节点是否已断开连接,并且无法立即重新连接(即断开连接后是否处于重新连接退避窗口)。if (isUnavailable(node)) {// 检查给定节点上的身份验证错误,如果存在则抛出异常。maybeThrowAuthFailure(node);// 如果我们试图在重新连接退避窗口期间发送,那么请求在发送之前无论如何都会失败,所以现在跳过发送请求// 跳过分区{}的获取,因为节点{}正在等待重新连接回退log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);} else if (nodesWithPendingFetchRequests.contains(node.id())) {// 跳过分区{}的获取,因为之前对{}的请求尚未处理log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);} else {// if there is a leader and no in-flight requests, issue a new fetch// 如果有leader并且没有正在进行的请求,则发出新的fetch// 也就是说,如果两个 consumer 同时消费一个分区的数据,是排队进行拉取的。// Absent 缺席的 不存在// 这里是返回一个 builder// fetchable 最开始一个一个空的map ,需要调用 k 来填充它// 再调用 sessionHandlers 这个map ,如果是空 再调用 n 来填充,// 最终 builder 是 FetchSessionHandler (维护用于连接到 broker 的fetch会话状态。)FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));return fetchSessionHandler.newBuilder();});Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);// 封装了一个 PartitionData 想 拉取请求 ,// 里面抱哈了 offset 、拉取的大小(max.partition.fetch.bytes )、当前leader的纪元// max.partition.fetch.bytes 默认 : 1 * 1024 * 1024 1M// 服务器将返回的每个分区的最大数据量。记录由消费者分批提取。// 如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保消费者可以取得进展。// 代理接受的最大记录批大小是通过<code>message.max.bytes</code>(代理配置)// 或<code>max.message.bytes</code](主题配置)定义的// 。请参阅FETCH_MAX_BYTES_CONFIG+“以限制消费者请求大小。”;// 里面封装了 ApiKeys.FETCH ,下面可以看下 ApiKeys.FETCH 中的操作// max.partition.fetch.bytes 是拉取每个分区的数据量// max.poll.records 是consumer一次拉取的数据量,// 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,position.offset,FetchRequest.INVALID_LOG_START_OFFSET,fetchConfig.fetchSize,position.currentLeader.epoch,Optional.empty());builder.add(partition, partitionData);// 将位置{}处的分区{}的{}获取请求添加到节点{}log.debug("Added {} fetch request for partition {} at position {} to node {}", fetchConfig.isolationLevel,partition, position, node);}}// entrySet() 返回 Set<Map.Entry<K, V>>// stream() 返回 Stream<E> 可以像 scala 一样进行流式操作// collect 收集, 将流中的元素 收集到一起// e.getValue().build() 返回 FetchRequestData// 因此这里将返回 拉取的数据// 返回的是 topicPartition 对应的 FetchRequestDatareturn fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));}
这里总结下
1、获取待拉取的TopicPartition
2、获取每个TopicPartition的目标节点(默认是该分区的leader节点)
如果满足以下三个条件,则不从leader节点拉取
1、之前已设置了首选副本
2、仍然在首选副本的租赁期内
3、副本仍然在线/可用
3、检测每个TopicPartition与目标节点的连接状态
4、为每个TopicPartition封装FetchRequest
需要特殊之处的是FetchRequest中封装了ApiKeys.FETCH
发送FetchRequest
这里调用了ConsumerNetworkClient的send()来队每个目标节点发送FetchRequest
它内部和其他分布式框架一样,并不是立即发送请求,而是放入队列中进行统一的发送
public class ConsumerNetworkClient implements Closeable {//交给底层通信,最终由NetworkClient处理//NetworkClient 是队NIO的封装private final KafkaClient client;//存放所有的请求的ConcurrentHashMap<Node -> ConcurrentLinkedQueue<ClientRequest>
>rivate final UnsentRequests unsent = new UnsentRequests();public RequestFuture<ClientResponse> send(...){RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();ClientRequest clientRequest = client.newClientRequest(...);//将该请求放入对应节点的队列中unsent.put(node, clientRequest);//这里最终会调到 this.nioSelector.wakeup();client.wakeup();return completionHandler.future;}long trySend(long now) {long pollDelayMs = maxPollTimeoutMs;// 发送任何可以立即发送的请求for (Node node : unsent.nodes()) {//获取每个节点对应的请求迭代器,依次进行发送Iterator<ClientRequest> iterator = unsent.requestIterator(node);if (iterator.hasNext())pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));while (iterator.hasNext()) {ClientRequest request = iterator.next();if (client.ready(node, now)) {client.send(request, now);iterator.remove();} else {// 当前节点未就绪时尝试下一个节点break;}}}return pollDelayMs;}
}
处理FetchRequest
下面我们看下KafkaApis中对应的处理方法,这个在《Kafka-follower同步leader数据》中也有涉及,也就是说follower同步leader数据和consumer拉取数据走的api是同一个,因为他们的动作及其相似。这里就不再详细分析了
处理拉回的数据
这里我们看下成功返回后的回调函数:handleFetchSuccess(fetchTarget, data, clientResponse)
protected void handleFetchSuccess(...){//获取响应体final FetchResponse response = (FetchResponse) resp.responseBody();//获取这个节点上的所有TopicPartitionfinal Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion);for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {TopicPartition partition = entry.getKey();FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);//......省略.......CompletedFetch completedFetch = new CompletedFetch(...)//把数据放 fetchBuffer 中 ,下次或者首次直接从缓存拿fetchBuffer.add(completedFetch);}}
可以看到,拉取的数据都放到了FetchBuffer中
2、从缓存中获取结果
public Fetch<K, V> collectFetch() {return fetchCollector.collectFetch(fetchBuffer);}public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {final Fetch<K, V> fetch = Fetch.empty();final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();// max.poll.records 默认 500// 单次调用poll()时返回的最大记录数注意,max.poll.records 不会影响底层的获取(fetching)行为。// 消费者将缓存每个获取请求中的记录,并从每个轮询中递增地返回它们int recordsRemaining = fetchConfig.maxPollRecords;try {while (recordsRemaining > 0) {// ConcurrentLinkedQueue<CompletedFetch> completedFetches;//private CompletedFetch nextInLineFetch;// 因此 nextInLineFetch 是一个完成拉取的数据final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();// 如果 fetchBuffer 中没有数据了 或者 nextInLineFetch 是被消费的if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {final CompletedFetch completedFetch = fetchBuffer.peek();// 如果等于null 是不是代表了一种情况:consumer客户端从各个节点拉取了 1300 条数据// 消费者每次最多了拉 500 条 ,最后一次组装500时不够了,的处理情况 ,也就是这次只返回 300 条// 下一次poll时 触发重新拉取,// 如果一行记录特别大时,就会造成 每次都需要重新拉。if (completedFetch == null)break;if (!completedFetch.isInitialized()) {try {fetchBuffer.setNextInLineFetch(initialize(completedFetch));} catch (Exception e) {// 在解析时删除一个completedExch,但以下情况除外:// (1)它不包含completedExtch,并且// (2)在这个异常之前没有提取到具有实际内容的completedExche。// 第一个条件确保在TopicAuthorizationException等情况下,completedExches不会与相同的completedExch保持一致,// 第二个条件确保不会因以下记录中的异常而导致潜在的数据丢失。if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)fetchBuffer.poll();throw e;}} else {fetchBuffer.setNextInLineFetch(completedFetch);}// 如果 是空的 ,那么就从队列中弹出它,把它从新进行拉取fetchBuffer.poll();} else if (subscriptions.isPaused(nextInLineFetch.partition)) {// 当分区暂停时,我们将记录添加回completedExches队列,而不是将其耗尽,以便在分区恢复时可以在后续轮询中返回// paused 暂停log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);pausedCompletedFetches.add(nextInLineFetch);fetchBuffer.setNextInLineFetch(null);} else {final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining);//这里真正的对 recordsRemaining 做减法 ,放入待返回的数据中recordsRemaining -= nextFetch.numRecords();fetch.add(nextFetch);}}} catch (KafkaException e) {//.....} finally {//.....}return fetch;}
FetchBuffer中存放了所有拉取回来的数据,其本身是对队列的一个封装。consumer每次从这个缓存队列中获取最多max.poll.records 条数据进行返回
我们详细看下fetchRecords()中的逻辑
private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRecords) {final TopicPartition tp = nextInLineFetch.partition;if (!subscriptions.isAssigned(tp)) {// 当在将提取的记录返回给消费者的投票呼叫之前进行重新平衡时,可能会发生这种情况// 如果数据已经拉取回来了,但是发生了分区从分配,就会给与提醒,数据不返回//由于分区{}不再被分配,因此不返回其已获取的记录log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);} else if (!subscriptions.isFetchable(tp)) {// 当在将获取的记录返回给消费者的轮询调用之前暂停分区,或者重置偏移量时,可能会发生这种情况// 由于分配的分区{}不再可获取,因此不返回已获取的记录“log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);} else {SubscriptionState.FetchPosition position = subscriptions.position(tp);if (position == null)throw new IllegalStateException("Missing position for fetchable partition " + tp);if (nextInLineFetch.nextFetchOffset() == position.offset) {//这里用到了解码器List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig,deserializers,maxRecords);log.trace("Returning {} fetched records at offset {} for assigned partition {}",partRecords.size(), position, tp);boolean positionAdvanced = false;//kafka 自身做的 更新offsetif (nextInLineFetch.nextFetchOffset() > position.offset) {SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(nextInLineFetch.nextFetchOffset(),nextInLineFetch.lastEpoch(),position.currentLeader);// 将分区{}的提取位置从{}更新到{},并从`poll()返回{}条记录`log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",position, nextPosition, tp, partRecords.size());subscriptions.position(tp, nextPosition);positionAdvanced = true;}Long partitionLag = subscriptions.partitionLag(tp, fetchConfig.isolationLevel);if (partitionLag != null)metricsManager.recordPartitionLag(tp, partitionLag);Long lead = subscriptions.partitionLead(tp);if (lead != null) {metricsManager.recordPartitionLead(tp, lead);}return Fetch.forPartition(tp, partRecords, positionAdvanced);} else {//这些记录不是最后一个消耗位置的下一个记录,忽略它们,它们必须来自过时的请求log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",tp, nextInLineFetch.nextFetchOffset(), position);}}log.trace("Draining fetched records for partition {}", tp);nextInLineFetch.drain();return Fetch.empty();}
五、总结
1、构建并初始化KafkaConsumer
2、订阅topic列表或指定分区拉取
3、调用KafkaConsumer的poll()拉取数据
4、调用LegacyKafkaConsumer的poll()拉取数据
5、判断FetchBuffer中是否由数据,如果有直接返回
6、如果FetchBuffer中没有数据,则开始拉取数据
7、更新并校验元数据
8、获取每个TopicPartition的目标节点(默认是该分区的leader节点)
如果满足以下三个条件,则不从leader节点拉取
1、之前已设置了首选副本
2、仍然在首选副本的租赁期内
3、副本仍然在线/可用
9、检测每个TopicPartition与目标节点的连接状态
10、为每个TopicPartition封装FetchRequest
11、将这些FetchRequest按照目标节点存放
12、向每个目标节点的ApiKeys.FETCH请求
13、目标节点处理ApiKeys.FETCH请求,如果是最近的数据,直接从pagecache中读取数据发到网卡返回。
14、将异步返回的数据按照TopicPartition纬度放入FetchBuffer中
15、从FetchBuffer读取数据还要判断此时分配器是否将该分区分配给了其他consumer,如果被分配走也不会读取
16、调用解码器队数据进行解码
17、Kafka此时根据返回的数据更新offset
18、我们自己写的Consumer获取到数据