vvcephei commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r561967937
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ########## @@ -218,7 +218,21 @@ public synchronized void unsubscribe() { } toClear.forEach(p -> this.records.remove(p)); - return new ConsumerRecords<>(results); + + final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>(); + for (final TopicPartition partition : subscriptions.assignedPartitions()) { + if (subscriptions.hasValidPosition(partition) && beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) { Review comment: Ah, this is from before I removed it. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { Review comment: I copied this check from fetchRecords, which says "this can happen when a rebalance happened before fetched records are returned to the consumer's poll call". I.e., it seems like it can actually happen, but a comment is called for, at least. I'll add it. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { + // initializeCompletedFetch, above, has already persisted the metadata from the fetch in the + // SubscriptionState, so we can just read it out, which in particular lets us re-use the logic + // for determining the end offset + final long receivedTimestamp = nextInLineFetch.receivedTimestamp; + final Long beginningOffset = subscriptions.logStartOffset(partition); + final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel); + final FetchPosition fetchPosition = subscriptions.position(partition); + + final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); + if (fetchMetadata == null + || !fetchMetadata.position().offsetEpoch.isPresent() + || fetchPosition.offsetEpoch.isPresent() + && fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { Review comment: Ah, good catch. It looks like this was also leftover from a previous version. I used to directly populate the returned metadata from the fetch response, but now I'm just populating the returned metadata from the subscription state, which `initializeCompletedFetch` has already updated. The benefit is that we don't have to worry about cases like this, since they've already been checked. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws InterruptedException { assertThrows(IllegalStateException.class, consumer::groupMetadata); } + @Test + public void testPollMetadata() { + final Time time = new MockTime(); + final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + + initMetadata(client, singletonMap(topic, 1)); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer<String, String> consumer = + newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + + consumer.assign(singleton(tp0)); + consumer.seek(tp0, 50L); + + final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); + client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); + + final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); + + // verify that the consumer computes the correct metadata based on the fetch response + final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); + assertEquals(100L, (long) actualMetadata.endOffset()); + assertEquals(55L, (long) actualMetadata.position()); + assertEquals(45L, (long) actualMetadata.lag()); + consumer.close(Duration.ZERO); + } + + + @Test + public void testPollMetadataWithExtraPartitions() { Review comment: Good question. Thanks to your other comment, I've removed the stale epoch check. By the "no prev value" case, are you referring to what happens when we get a fetch response for a partition for the first time? This is actually _all_ we're testing here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org