junrao commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1925757185
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); } + /** + * Simple utility method that returns a {@link SubscriptionState.FetchPosition position} for the partition. If + * no position exists, an {@link IllegalStateException} is thrown. + */ + private SubscriptionState.FetchPosition positionForPartition(TopicPartition partition) { + SubscriptionState.FetchPosition position = subscriptions.position(partition); + + if (position == null) + throw new IllegalStateException("Missing position for fetchable partition " + partition); + + return position; + } + + /** + * Retrieves the node from which to fetch the partition data. If the given + * {@link SubscriptionState.FetchPosition position} does not have a current + * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will return {@link Optional#empty()}. + * + * @return Three options: 1) {@link Optional#empty()} if the position's leader is empty, 2) the + * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if defined}, or 3) the position's + * {@link Metadata.LeaderAndEpoch#leader leader} + */ + private Optional<Node> maybeNodeForPosition(TopicPartition partition, + SubscriptionState.FetchPosition position, + long currentTimeMs) { + Optional<Node> leaderOpt = position.currentLeader.leader; + + if (leaderOpt.isEmpty()) { + log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); + metadata.requestUpdate(false); Review Comment: `MetadataResponse.createTopicMetadata()` converts a leaderId of -1 to empty. The existing behavior is to not act on it immediately, but trigger a metadata update when building the fetch request. So, this part of the change is fine since it's an existing behavior. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3437,21 +3443,318 @@ public void testPollWithRedundantCreateFetchRequests() { } - private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( - TopicPartition topicPartition, - Errors error, - int leaderEpoch, - long endOffset - ) { - OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData(); - data.topics().add(new OffsetForLeaderTopicResult() - .setTopic(topicPartition.topic()) - .setPartitions(Collections.singletonList(new EpochEndOffset() - .setPartition(topicPartition.partition()) - .setErrorCode(error.code()) - .setLeaderEpoch(leaderEpoch) - .setEndOffset(endOffset)))); - return new OffsetsForLeaderEpochResponse(data); + /** + * This test makes several calls to {@link #sendFetches()}, and after each, the buffered partitions are + * modified to either cause (or prevent) a fetch from being requested. + */ + @Test + public void testFetchRequestWithBufferedPartitions() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #2 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #3 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 0's partitions have all been collected, so validate that and then reset the list of partitions + // from which to fetch data so the next pass should request can fetch more data. + assertTrue(node0Partitions.isEmpty()); + node0Partitions = partitionsForNode(node0, partitions); + + // sendFetches() call #4 should issue a request to node 0 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 1's partitions have likewise all been collected, so validate that and reset. + assertTrue(node1Partitions.isEmpty()); + node1Partitions = partitionsForNode(node1, partitions); + + // sendFetches() call #5 should issue a request to node 1 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node1, node1Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Collect all the records and make sure they include all the partitions, and validate that there is no data + // remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #6 should issue a request to nodes 0 and 1 since its buffered data was collected. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 20); + prepareFetchResponses(node1, node1Partitions, 20); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Just for completeness, collect all the records and make sure they include all the partitions, and validate + // that there is no data remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + } + + @Test + public void testFetchRequestWithBufferedPartitionNotAssigned() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will be unassigned in a bit. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Change the set of assigned partitions to exclude the remaining buffered partition for node 0, which means + // that partition is unassigned. + Set<TopicPartition> topicsWithoutUnassignedPartition = new HashSet<>(partitions); + topicsWithoutUnassignedPartition.remove(node0Partition2); + subscriptions.assignFromUser(topicsWithoutUnassignedPartition); + + // The collected partition should have a retrievable position, but the unassigned position should throw + // an error when attempting to retrieve its position. + assertDoesNotThrow(() -> subscriptions.position(node0Partition1)); + assertThrows(IllegalStateException.class, () -> subscriptions.position(node0Partition2)); + + // sendFetches() call #2 should issue a request to node 0 because the first partition in node 0 was collected + // (and its buffer removed) and the second partition for node 0 was unassigned. As a result, there are now no + // *assigned* partitions for node 0 that are buffered. + assertEquals(1, sendFetches()); + } + + @Test + public void testFetchRequestWithBufferedPartitionMissingLeader() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will clear out its position's leader + // node. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Overwrite the position with an empty leader to trigger the test case. + SubscriptionState.FetchPosition leaderlessPosition = new SubscriptionState.FetchPosition( + 0, + Optional.empty(), + Metadata.LeaderAndEpoch.noLeaderOrEpoch() + ); + subscriptions.position(node0Partition2, leaderlessPosition); + + // Both the collected partition and the position without a partition leader should have a retrievable position. Review Comment: Both the collected partition and the position without a partition leader should have a retrievable position. => Both collected partitions should have a retrievable position ? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() long currentTimeMs = time.milliseconds(); Map<String, Uuid> topicIds = metadata.topicIds(); - for (TopicPartition partition : fetchablePartitions()) { - SubscriptionState.FetchPosition position = subscriptions.position(partition); + // This is the set of partitions that have buffered data + Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions()); - if (position == null) - throw new IllegalStateException("Missing position for fetchable partition " + partition); + // This is the set of partitions that do not have buffered data + Set<TopicPartition> unbuffered = fetchablePartitions(buffered); - Optional<Node> leaderOpt = position.currentLeader.leader; + if (unbuffered.isEmpty()) { + // If there are no partitions that don't already have data locally buffered, there's no need to issue + // any fetch requests at the present time. + return Collections.emptyMap(); + } - if (leaderOpt.isEmpty()) { - log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); - metadata.requestUpdate(false); + Set<Integer> bufferedNodes = new HashSet<>(); + + for (TopicPartition partition : buffered) { + // It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered + // data from a *previous* request is no longer assigned. So before attempting to retrieve the node + // information, check that the partition is still assigned as calling SubscriptionState.position() on an + // unassigned partition will throw an IllegalStateException. + // + // Note: this check is not needed for the unbuffered partitions as the logic in + // SubscriptionState.fetchablePartitions() only includes partitions currently assigned. + if (!subscriptions.isAssigned(partition)) Review Comment: An assigned partition doesn't necessarily have a valid position. So, we need to do a stricter check here. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3437,21 +3443,318 @@ public void testPollWithRedundantCreateFetchRequests() { } - private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( - TopicPartition topicPartition, - Errors error, - int leaderEpoch, - long endOffset - ) { - OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData(); - data.topics().add(new OffsetForLeaderTopicResult() - .setTopic(topicPartition.topic()) - .setPartitions(Collections.singletonList(new EpochEndOffset() - .setPartition(topicPartition.partition()) - .setErrorCode(error.code()) - .setLeaderEpoch(leaderEpoch) - .setEndOffset(endOffset)))); - return new OffsetsForLeaderEpochResponse(data); + /** + * This test makes several calls to {@link #sendFetches()}, and after each, the buffered partitions are + * modified to either cause (or prevent) a fetch from being requested. + */ + @Test + public void testFetchRequestWithBufferedPartitions() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #2 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #3 shouldn't issue requests to either node 0 or node 1 since they both have buffered data. + assertEquals(0, sendFetches()); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node0Partitions.remove(0), partitions); + assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 0's partitions have all been collected, so validate that and then reset the list of partitions + // from which to fetch data so the next pass should request can fetch more data. + assertTrue(node0Partitions.isEmpty()); + node0Partitions = partitionsForNode(node0, partitions); + + // sendFetches() call #4 should issue a request to node 0 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + collectSelectedPartition(node1Partitions.remove(0), partitions); + assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Node 1's partitions have likewise all been collected, so validate that and reset. + assertTrue(node1Partitions.isEmpty()); + node1Partitions = partitionsForNode(node1, partitions); + + // sendFetches() call #5 should issue a request to node 1 since its buffered data was collected. + assertEquals(1, sendFetches()); + prepareFetchResponses(node1, node1Partitions, 10); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Collect all the records and make sure they include all the partitions, and validate that there is no data + // remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + + // sendFetches() call #6 should issue a request to nodes 0 and 1 since its buffered data was collected. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 20); + prepareFetchResponses(node1, node1Partitions, 20); + networkClientDelegate.poll(time.timer(0)); + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Just for completeness, collect all the records and make sure they include all the partitions, and validate + // that there is no data remaining in the fetch buffer. + assertEquals(partitions, fetchRecords().keySet()); + assertEquals(0, fetcher.fetchBuffer.bufferedPartitions().size()); + } + + @Test + public void testFetchRequestWithBufferedPartitionNotAssigned() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will be unassigned in a bit. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Change the set of assigned partitions to exclude the remaining buffered partition for node 0, which means + // that partition is unassigned. + Set<TopicPartition> topicsWithoutUnassignedPartition = new HashSet<>(partitions); + topicsWithoutUnassignedPartition.remove(node0Partition2); + subscriptions.assignFromUser(topicsWithoutUnassignedPartition); + + // The collected partition should have a retrievable position, but the unassigned position should throw + // an error when attempting to retrieve its position. + assertDoesNotThrow(() -> subscriptions.position(node0Partition1)); + assertThrows(IllegalStateException.class, () -> subscriptions.position(node0Partition2)); + + // sendFetches() call #2 should issue a request to node 0 because the first partition in node 0 was collected + // (and its buffer removed) and the second partition for node 0 was unassigned. As a result, there are now no + // *assigned* partitions for node 0 that are buffered. + assertEquals(1, sendFetches()); + } + + @Test + public void testFetchRequestWithBufferedPartitionMissingLeader() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will clear out its position's leader + // node. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Overwrite the position with an empty leader to trigger the test case. + SubscriptionState.FetchPosition leaderlessPosition = new SubscriptionState.FetchPosition( + 0, + Optional.empty(), + Metadata.LeaderAndEpoch.noLeaderOrEpoch() + ); + subscriptions.position(node0Partition2, leaderlessPosition); + + // Both the collected partition and the position without a partition leader should have a retrievable position. + // Confirm that position() doesn't throw an exception and that the leader for the second partition is missing. + assertNotEquals(Optional.empty(), subscriptions.position(node0Partition1).currentLeader.leader); + assertEquals(Optional.empty(), subscriptions.position(node0Partition2).currentLeader.leader); + + // sendFetches() call #2 should issue a fetch request to node 0 because its first partition was collected + // (and its buffer removed) and the second partition had its leader node cleared. As a result, there are now + // effectively no topic partitions for node 0. + assertEquals(1, sendFetches()); + } + + @Test + public void testFetchRequestWithBufferedPartitionMissingPosition() { + buildFetcher(); + + // The test requires that there are multiple nodes as the fetch request logic is based in part off of the + // partition-to-node relationship. + int numNodes = 2; + Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3); + assignFromUser(partitions, numNodes); + + // Get all the nodes serving as the leader for these partitions, and then split them into the separate + // nodes and set of partitions to make things easier to keep track of later. + List<Node> nodes = nodesForPartitionLeaders(partitions); + Node node0 = nodes.get(0); + Node node1 = nodes.get(1); + List<TopicPartition> node0Partitions = partitionsForNode(node0, partitions); + List<TopicPartition> node1Partitions = partitionsForNode(node1, partitions); + + // Seek each partition so that it becomes eligible to fetch. + partitions.forEach(tp -> subscriptions.seek(tp, 0)); + + // sendFetches() call #1 should issue requests to node 0 or node 1 since neither has buffered data. + assertEquals(2, sendFetches()); + prepareFetchResponses(node0, node0Partitions, 0); + prepareFetchResponses(node1, node1Partitions, 0); + networkClientDelegate.poll(time.timer(0)); + + // Grab both partitions for node 0. The first partition will be collected so that it doesn't have anything + // in the fetch buffer. The second node will be left in the buffer, but will clear out its position's leader + // node. + TopicPartition node0Partition1 = node0Partitions.remove(0); + TopicPartition node0Partition2 = node0Partitions.remove(0); + + assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size()); + collectSelectedPartition(node0Partition1, partitions); + assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size()); + + // Overwrite the position with an empty leader to trigger the test case. + subscriptions.position(node0Partition2, null); + + // Both the collected partition and the position without a partition leader should have a retrievable position. + // Confirm that position() doesn't throw an exception and that the leader for the second partition is missing. + assertNotEquals(Optional.empty(), subscriptions.position(node0Partition1).currentLeader.leader); + assertNull(subscriptions.position(node0Partition2)); + + // sendFetches() call #2 will now fail to send any requests as we have an invalid position in the assignment. + // The Consumer.poll() API will throw an IllegalStateException to the user. + Future<Void> future = fetcher.createFetchRequests(); + assertEquals(0, sendFetches()); + assertFutureThrows(future, IllegalStateException.class); Review Comment: Is this the right behavior? The original code only calls `subscriptions.position` for partitions that are fetchable, which implies that the position is available. So, it will never throw IllegalStateException in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org