kirktrue commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1932701792
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests() { } - private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( - TopicPartition topicPartition, - Errors error, - int leaderEpoch, - long endOffset - ) { - OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData(); - data.topics().add(new OffsetForLeaderTopicResult() - .setTopic(topicPartition.topic()) - .setPartitions(Collections.singletonList(new EpochEndOffset() - .setPartition(topicPartition.partition()) - .setErrorCode(error.code()) - .setLeaderEpoch(leaderEpoch) - .setEndOffset(endOffset)))); - return new OffsetsForLeaderEpochResponse(data); + /** + * This test makes several calls to {@link #sendFetches()}, and after each, the buffered partitions are + * modified to either cause (or prevent) a fetch from being requested. + */ + @Test + public void testFetchRequestWithBufferedPartitions() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #2 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #3 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 0's partitions have all been collected, so validate that and then reset the list of partitions + // from which to fetch data so the next pass should request can fetch more data. Review Comment: Now reads: ```java // Validate that all of node 0's partitions have all been collected. assertTrue(node0Partitions.isEmpty()); // Reset the list of partitions for node 0 so the next fetch pass requests data. node0Partitions = partitionsForNode(node0, partitions); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org