kirktrue commented on code in PR #18795: URL: https://github.com/apache/kafka/pull/18795#discussion_r1940329129
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -408,22 +407,52 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() long currentTimeMs = time.milliseconds(); Map<String, Uuid> topicIds = metadata.topicIds(); - for (TopicPartition partition : fetchablePartitions()) { - SubscriptionState.FetchPosition position = subscriptions.position(partition); + // This is the set of partitions that have buffered data + Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions()); - if (position == null) - throw new IllegalStateException("Missing position for fetchable partition " + partition); + // This is the set of partitions that do not have buffered data + Set<TopicPartition> unbuffered = fetchablePartitions(buffered); - Optional<Node> leaderOpt = position.currentLeader.leader; + if (unbuffered.isEmpty()) { + // If there are no partitions that don't already have data locally buffered, there's no need to issue + // any fetch requests at the present time. + return Collections.emptyMap(); + } - if (leaderOpt.isEmpty()) { - log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); - metadata.requestUpdate(false); + Set<Integer> bufferedNodes = new HashSet<>(); + + for (TopicPartition partition : buffered) { + // It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered + // data from a *previous* request is no longer assigned. So before attempting to retrieve the node + // information, check that the partition is still assigned and fetchable; an unassigned/invalid partition + // will throw an IllegalStateException in positionForPartition. + // + // Note: this check is not needed for the unbuffered partitions as the logic in + // SubscriptionState.fetchablePartitions() only includes partitions currently assigned. + if (!subscriptions.hasValidPosition(partition)) + continue; + + // A paused partition remains in the fetch buffer even though it's not returned to the user. Unless + // this case is handled, a paused partition will inadvertently "block" a request from being created to + // fetch data for other partitions, leading users to poll indefinitely waiting for more data. + // + // See FetchCollector.collectFetch(). + if (subscriptions.isPaused(partition)) + continue; Review Comment: FYI: I updated the comment after adding my reviewers’ notes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org