junrao commented on code in PR #18795: URL: https://github.com/apache/kafka/pull/18795#discussion_r1943876199
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -408,22 +407,54 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() long currentTimeMs = time.milliseconds(); Map<String, Uuid> topicIds = metadata.topicIds(); - for (TopicPartition partition : fetchablePartitions()) { - SubscriptionState.FetchPosition position = subscriptions.position(partition); + // This is the set of partitions that have buffered data + Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions()); - if (position == null) - throw new IllegalStateException("Missing position for fetchable partition " + partition); + // This is the set of partitions that do not have buffered data + Set<TopicPartition> unbuffered = fetchablePartitions(buffered); - Optional<Node> leaderOpt = position.currentLeader.leader; + if (unbuffered.isEmpty()) { + // If there are no partitions that don't already have data locally buffered, there's no need to issue + // any fetch requests at the present time. + return Collections.emptyMap(); + } - if (leaderOpt.isEmpty()) { - log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); - metadata.requestUpdate(false); + Set<Integer> bufferedNodes = new HashSet<>(); + + for (TopicPartition partition : buffered) { + // It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered + // data from a *previous* request is no longer assigned. So before attempting to retrieve the node + // information, check that the partition is still assigned and fetchable; an unassigned/invalid partition + // will throw an IllegalStateException in positionForPartition. + // + // Note: this check is not needed for the unbuffered partitions as the logic in + // SubscriptionState.fetchablePartitions() only includes partitions currently assigned. + if (!subscriptions.hasValidPosition(partition)) + continue; + + // A paused partition remains in the fetch buffer and is not returned to the user (unless the partition + // is later resumed). Therefore, buffered partitions which are also paused should be ignored when + // determining which nodes to skip when fetching. Otherwise, the node's *other* eligible (unbuffered + // and/or un-paused) partitions would never be fetched, leading users to poll indefinitely waiting for + // data to be returned. + // + // See FetchCollector.collectFetch(). + if (subscriptions.isPaused(partition)) Review Comment: Should we just use `subscriptions.isFetchable`? Intuitively, an un-assigned/revoking partition shouldn't block the fetching of other partitions. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3437,21 +3443,373 @@ public void testPollWithRedundantCreateFetchRequests() { } - private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( - TopicPartition topicPartition, - Errors error, - int leaderEpoch, - long endOffset - ) { - OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData(); - data.topics().add(new OffsetForLeaderTopicResult() - .setTopic(topicPartition.topic()) - .setPartitions(Collections.singletonList(new EpochEndOffset() - .setPartition(topicPartition.partition()) - .setErrorCode(error.code()) - .setLeaderEpoch(leaderEpoch) - .setEndOffset(endOffset)))); - return new OffsetsForLeaderEpochResponse(data); + /** + * This test makes several calls to {@link #sendFetches()}, and after each, the buffered partitions are + * modified to either cause (or prevent) a fetch from being requested. + */ + @Test + public void testFetchRequestWithBufferedPartitions() { + buildFetcher(); + + // The partitions are spread across multiple nodes to ensure the fetcher's logic correctly handles the + // partition-to-node mapping. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // Get all the nodes serving as the leader for these partitions. + List<Node> nodes = nodesForPartitionLeaders(partitions); + + // Extract the nodes and their respective set of partitions to make things easier to keep track of later. + assertEquals(2, nodes.size()); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + assertEquals(2, node0Partitions.size()); + assertEquals(2, node1Partitions.size()); + TopicPartition node0Partition1 = node0Partitions.get(0); + TopicPartition node0Partition2 = node0Partitions.get(1); + TopicPartition node1Partition1 = node1Partitions.get(0); + TopicPartition node1Partition2 = node1Partitions.get(1); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + node0Partitions.remove(node0Partition1); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #2 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partition1, partitions); + node1Partitions.remove(node1Partition1); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #3 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node0Partition2, partitions); + node0Partitions.remove(node0Partition2); + assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Validate that all of node 0's partitions have all been collected. + assertTrue(node0Partitions.isEmpty()); + + // Reset the list of partitions for node 0 so the next fetch pass requests data. + node0Partitions = partitionsForNode(node0, partitions); + + // sendFetches() call #4 should issue a request to node 0 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + + collectSelectedPartition(node1Partition2, partitions); + node1Partitions.remove(node1Partition2); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 1's partitions have likewise all been collected, so validate that. + assertTrue(node1Partitions.isEmpty()); + + // Again, reset the list of partitions, this time for node 1, so the next fetch pass requests data. + node1Partitions = partitionsForNode(node1, partitions); + + // sendFetches() call #5 should issue a request to node 1 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node1, node1Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Collect all the records and make sure they include all the partitions, and validate that there is no data + // remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #6 should issue a request to nodes 0 and 1 since its buffered data was collected. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 20); + prepareFetchResponses(node1, node1Partitions, 20); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Just for completeness, collect all the records and make sure they include all the partitions, and validate + // that there is no data remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + } + + @Test + public void testFetchRequestWithBufferedPartitionNotAssigned() { + buildFetcher(); + + // The partitions are spread across multiple nodes to ensure the fetcher's logic correctly handles the + // partition-to-node mapping. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // Get all the nodes serving as the leader for these partitions. + List<Node> nodes = nodesForPartitionLeaders(partitions); + + // Extract the nodes and their respective set of partitions to make things easier to keep track of later. + assertEquals(2, nodes.size()); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + assertEquals(2, node0Partitions.size()); + assertEquals(2, node1Partitions.size()); + TopicPartition node0Partition1 = node0Partitions.get(0); + TopicPartition node0Partition2 = node0Partitions.get(1); + TopicPartition node1Partition1 = node1Partitions.get(0); + TopicPartition node1Partition2 = node1Partitions.get(1); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Collect node0Partition1 so that it doesn't have anything in the fetch buffer. + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Exclude node0Partition2 (the remaining buffered partition for node 0) when updating the assigned partitions + // to cause it to become unassigned. + subscriptions.assignFromUser(Set.of( + node0Partition1, + // node0Partition2, // Intentionally omit this partition so that it is unassigned + node1Partition1, + node1Partition2 + )); + + // node0Partition1 (the collected partition) should have a retrievable position, but node0Partition2 + // (the unassigned position) should throw an error when attempting to retrieve its position. + assertDoesNotThrow(() -> subscriptions.position(node0Partition1)); + assertThrows(IllegalStateException.class, () -> subscriptions.position(node0Partition2)); + + // sendFetches() call #2 should issue a request to node 0 because the first partition in node 0 was collected + // (and its buffer removed) and the second partition for node 0 was unassigned. As a result, there are now no + // *assigned* partitions for node 0 that are buffered. + assertEquals(1, sendFetches()); + } + + @Test + public void testFetchRequestWithBufferedPartitionMissingLeader() { + buildFetcher(); + + Set<TopicPartition> partitions = Set.of(tp0, tp1); + assignFromUser(partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + Node leader = metadata.fetch().leaderFor(tp0); + + // sendFetches() call #1 should issue a request since there's no buffered data. + assertEquals(1, sendFetches()); + prepareFetchResponses(leader, Set.of(tp0, tp1), 0); + networkClientDelegate.poll(time.timer(0)); + + // Per the fetch response, data for both of the partitions are in the fetch buffer. + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp0)); + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1)); + + // Collect the first partition (tp0) which will remove it from the fetch buffer. + collectSelectedPartition(tp0, partitions); + + // Since tp0 was collected, it's not in the fetch buffer, but tp1 remains in the fetch buffer. + assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(tp0)); + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1)); + + // Overwrite tp1's position with an empty leader, but verify that it is still buffered. Having a leaderless, + // buffered partition is key to triggering the test case. + subscriptions.position(tp1, new SubscriptionState.FetchPosition( + 0, + Optional.empty(), + Metadata.LeaderAndEpoch.noLeaderOrEpoch() + )); + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1)); + + // Validate the state of the collected partition (tp0) and leaderless partition (tp1) before sending the + // fetch request. + assertTrue(subscriptions.position(tp0).currentLeader.leader.isPresent()); + assertFalse(subscriptions.position(tp1).currentLeader.leader.isPresent()); + + // sendFetches() call #2 should issue a fetch request because it has no buffered partitions: + // + // - tp0 was collected and thus not in the fetch buffer + // - tp1, while still in the fetch buffer, is leaderless + // + // As a result, there are now effectively no buffered partitions for which there is a leader. + assertEquals(1, sendFetches()); + } + + @Test + public void testFetchRequestWithBufferedPartitionMissingPosition() { + buildFetcher(); + + // The partitions are spread across multiple nodes to ensure the fetcher's logic correctly handles the + // partition-to-node mapping. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // Get all the nodes serving as the leader for these partitions. + List<Node> nodes = nodesForPartitionLeaders(partitions); + + // Extract the nodes and their respective set of partitions to make things easier to keep track of later. + assertEquals(2, nodes.size()); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + assertEquals(2, node0Partitions.size()); + assertEquals(2, node1Partitions.size()); + TopicPartition node0Partition1 = node0Partitions.get(0); + TopicPartition node0Partition2 = node0Partitions.get(1); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Collect node 0's first partition (node0Partition1) which will remove it from the fetch buffer. + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Overwrite node0Partition2's position with an empty leader to trigger the test case. + subscriptions.position(node0Partition2, null); + + // Confirm that calling SubscriptionState.position() succeeds for a leaderless partition. While it shouldn't + // throw an exception, it should return a null position. + SubscriptionState.FetchPosition position = assertDoesNotThrow(() -> subscriptions.position(node0Partition2)); + assertNull(position); + + // sendFetches() call #2 will now fail to send any requests as we have an invalid position in the assignment. + // The Consumer.poll() API will throw an IllegalStateException to the user. + Future<Void> future = fetcher.createFetchRequests(); + assertEquals(0, sendFetches()); + assertFutureThrows(future, IllegalStateException.class); + } + + @Test + public void testFetchRequestWithBufferedAndPausedPartition() { + buildFetcher(); + + Set<TopicPartition> partitions = Set.of(tp0, tp1); + assignFromUser(partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue a request since there's no buffered data. + assertEquals(1, sendFetches()); + prepareFetchResponses(metadata.fetch().leaderFor(tp0), Set.of(tp0, tp1), 0); + networkClientDelegate.poll(time.timer(0)); + + // Per the fetch response, data for both of the partitions are in the fetch buffer. + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp0)); + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1)); + + // Collect the first partition (tp0) which will remove it from the fetch buffer. + collectSelectedPartition(tp0, partitions); + assertFalse(fetcher.fetchBuffer.bufferedPartitions().contains(tp0)); + + // Pause tp1, but verify that it is still buffered. Having a paused, buffered partition is key to triggering + // the test case. + subscriptions.pause(tp1); + assertTrue(fetcher.fetchBuffer.bufferedPartitions().contains(tp1)); + + // sendFetches() call #2 should issue a fetch request because it has no buffered partitions: + // + // - tp0 was collected and thus not in the fetch buffer + // - tp1, while still in the fetch buffer, is paused and its node should be ignored + assertEquals(1, sendFetches()); Review Comment: Is there an easy way to verify the partitions in the fetch request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org