kirktrue commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1932698069
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3437,21 +3443,330 @@ public void testPollWithRedundantCreateFetchRequests() { } - private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( - TopicPartition topicPartition, - Errors error, - int leaderEpoch, - long endOffset - ) { - OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData(); - data.topics().add(new OffsetForLeaderTopicResult() - .setTopic(topicPartition.topic()) - .setPartitions(Collections.singletonList(new EpochEndOffset() - .setPartition(topicPartition.partition()) - .setErrorCode(error.code()) - .setLeaderEpoch(leaderEpoch) - .setEndOffset(endOffset)))); - return new OffsetsForLeaderEpochResponse(data); + /** + * This test makes several calls to {@link #sendFetches()}, and after each, the buffered partitions are + * modified to either cause (or prevent) a fetch from being requested. + */ + @Test + public void testFetchRequestWithBufferedPartitions() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #2 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #3 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 0's partitions have all been collected, so validate that and then reset the list of partitions + // from which to fetch data so the next pass should request can fetch more data. + assertTrue(node0Partitions.isEmpty()); + node0Partitions = partitionsForNode(node0, partitions); + + // sendFetches() call #4 should issue a request to node 0 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 1's partitions have likewise all been collected, so validate that and reset. + assertTrue(node1Partitions.isEmpty()); + node1Partitions = partitionsForNode(node1, partitions); + + // sendFetches() call #5 should issue a request to node 1 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node1, node1Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Collect all the records and make sure they include all the partitions, and validate that there is no data + // remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #6 should issue a request to nodes 0 and 1 since its buffered data was collected. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 20); + prepareFetchResponses(node1, node1Partitions, 20); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Just for completeness, collect all the records and make sure they include all the partitions, and validate + // that there is no data remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + } + + @Test + public void testFetchRequestWithBufferedPartitionNotAssigned() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will be unassigned in a bit. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Change the set of assigned partitions to exclude the remaining buffered partition for node 0, which means + // that partition is unassigned. + Set<TopicPartition> topicsWithoutUnassignedPartition = new HashSet<>(partitions); + topicsWithoutUnassignedPartition.remove(node0Partition2); Review Comment: No, because `topicsWithoutUnassignedPartition` hold the original four partitions _minus_ `node0Partition2`. Regardless, I reworked how this is done in the test to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org