kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1932701792


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests() 
{
 
     }
 
-    private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
-            TopicPartition topicPartition,
-            Errors error,
-            int leaderEpoch,
-            long endOffset
-    ) {
-        OffsetForLeaderEpochResponseData data = new 
OffsetForLeaderEpochResponseData();
-        data.topics().add(new OffsetForLeaderTopicResult()
-                .setTopic(topicPartition.topic())
-                .setPartitions(Collections.singletonList(new EpochEndOffset()
-                        .setPartition(topicPartition.partition())
-                        .setErrorCode(error.code())
-                        .setLeaderEpoch(leaderEpoch)
-                        .setEndOffset(endOffset))));
-        return new OffsetsForLeaderEpochResponse(data);
+    /**
+     * This test makes several calls to {@link #sendFetches()}, and after 
each, the buffered partitions are
+     * modified to either cause (or prevent) a fetch from being requested.
+     */
+    @Test
+    public void testFetchRequestWithBufferedPartitions() {
+        buildFetcher();
+
+        // The test requires that there are multiple nodes as the fetch 
request logic is based in part off of the
+        // partition-to-node relationship.
+        int numNodes = 2;
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions, numNodes);
+
+        // Get all the nodes serving as the leader for these partitions, and 
then split them into the separate
+        // nodes and set of partitions to make things easier to keep track of 
later.
+        List<Node> nodes = nodesForPartitionLeaders(partitions);
+        Node node0 = nodes.get(0);
+        Node node1 = nodes.get(1);
+        List<TopicPartition> node0Partitions = partitionsForNode(node0, 
partitions);
+        List<TopicPartition> node1Partitions = partitionsForNode(node1, 
partitions);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        // sendFetches() call #1 should issue requests to node 0 or node 1 
since neither has buffered data.
+        assertEquals(2, sendFetches());
+        prepareFetchResponses(node0, node0Partitions, 0);
+        prepareFetchResponses(node1, node1Partitions, 0);
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+        collectSelectedPartition(node0Partitions.remove(0), partitions);
+        assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #2 shouldn't issue requests to either node 0 or 
node 1 since they both have buffered data.
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        collectSelectedPartition(node1Partitions.remove(0), partitions);
+        assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #3 shouldn't issue requests to either node 0 or 
node 1 since they both have buffered data.
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        collectSelectedPartition(node0Partitions.remove(0), partitions);
+        assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Node 0's partitions have all been collected, so validate that and 
then reset the list of partitions
+        // from which to fetch data so the next pass should request can fetch 
more data.

Review Comment:
   Now reads:
   
   ```java
   // Validate that all of node 0's partitions have all been collected.
   assertTrue(node0Partitions.isEmpty());
   
   // Reset the list of partitions for node 0 so the next fetch pass requests 
data.
   node0Partitions = partitionsForNode(node0, partitions);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to