vvcephei commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r552938808
########## File path: build.gradle ########## @@ -1070,6 +1070,7 @@ project(':clients') { testCompile libs.junitJupiterApi testCompile libs.junitVintageEngine testCompile libs.mockitoCore + testCompile libs.hamcrest Review comment: To get access to more Matchers ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java ########## @@ -32,14 +34,99 @@ * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. */ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { - - @SuppressWarnings("unchecked") - public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP); + public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>( + Collections.emptyMap(), + Collections.emptyMap() + ); private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private final Map<TopicPartition, Metadata> metadata; + + public static final class Metadata { Review comment: I made this a static inner class, since "metadata" is such an abstract concept. This way, the scope is clear. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ########## @@ -229,6 +244,7 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) { throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer"); List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>()); recs.add(record); + endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset())); Review comment: Just adding this computation in for convenience. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -637,20 +637,42 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { + final long receivedTimestamp = nextInLineFetch.receivedTimestamp; + + final long startOffset = nextInLineFetch.partitionData.logStartOffset(); + + // read_uncommitted: + //that is, the offset of the last successfully replicated message plus one + final long hwm = nextInLineFetch.partitionData.highWatermark(); + // read_committed: + //the minimum of the high watermark and the smallest offset of any open transaction + final long lso = nextInLineFetch.partitionData.lastStableOffset(); + + // end offset is: + final long endOffset = + isolationLevel == IsolationLevel.READ_UNCOMMITTED ? hwm : lso; + + final FetchPosition fetchPosition = subscriptions.position(partition); + + final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); + + if (fetchMetadata == null + || !fetchMetadata.position().offsetEpoch.isPresent() + || fetchPosition.offsetEpoch.isPresent() + && fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { + + fetched.addMetadata( + partition, + new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, endOffset) + ); } + } + + if (!records.isEmpty()) { + fetched.addRecords(partition, records); recordsRemaining -= records.size(); } Review comment: Then, we get the records (note the main logic is now internal to FetchedRecords). ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -524,7 +524,7 @@ public void testParseCorruptedRecord() throws Exception { consumerClient.poll(time.timer(0)); // the first fetchedRecords() should return the first valid message - assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); + assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size()); Review comment: Just a consequence of moving the records inside of a response struct. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java ########## @@ -32,14 +34,99 @@ * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. */ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { - - @SuppressWarnings("unchecked") - public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP); + public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>( + Collections.emptyMap(), + Collections.emptyMap() + ); private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private final Map<TopicPartition, Metadata> metadata; + + public static final class Metadata { + + private final long receivedTimestamp; + private final long position; + private final long beginningOffset; + private final long endOffset; + + public Metadata(final long receivedTimestamp, + final long position, + final long beginningOffset, + final long endOffset) { + this.receivedTimestamp = receivedTimestamp; + this.position = position; + this.beginningOffset = beginningOffset; + this.endOffset = endOffset; + } + + /** + * @return The timestamp of the broker response that contained this metadata + */ + public long receivedTimestamp() { + return receivedTimestamp; + } + + /** + * @return The next position the consumer will fetch + */ + public long position() { + return position; + } + + /** + * @return The lag between the next position to fetch and the current end of the partition + */ + public long lag() { + return endOffset - position; + } + + /** + * @return The current first offset in the partition. + */ + public long beginningOffset() { + return beginningOffset; + } + + /** + * @return The current last offset in the partition. The determination of the "last" offset + * depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully + * replicated offset plus one. Under "read_committed," this is the minimum of the last successfully + * replicated offset plus one or the smallest offset of any open transaction. + */ + public long endOffset() { + return endOffset; + } + } + + private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) { Review comment: This is where we translate from the internal result container to the public one. It could be moved to a utility class, but this seemed fine. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ########## @@ -1268,12 +1269,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo /** * @throws KafkaException if the rebalance callback throws exception */ - private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { + private FetchedRecords<K, V> pollForFetches(Timer timer) { Review comment: Added an internal struct so that the Fetcher can also return the desired metadata. ########## File path: checkstyle/suppressions.xml ########## @@ -100,7 +100,10 @@ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> <suppress checks="NPathComplexity" - files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/> + files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/> + + <suppress checks="CyclomaticComplexity" + files="MockConsumer"/> Review comment: I've added a couple more branches to MockConsumer, which pushed it over the line. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ########## @@ -218,7 +218,22 @@ public synchronized void unsubscribe() { } toClear.forEach(p -> this.records.remove(p)); - return new ConsumerRecords<>(results); + + final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>(); + for (final TopicPartition partition : subscriptions.assignedPartitions()) { + if (subscriptions.hasValidPosition(partition) && beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) { + final SubscriptionState.FetchPosition position = subscriptions.position(partition); + final long offset = position.offset; + final long beginningOffset = beginningOffsets.get(partition); + final long endOffset = endOffsets.get(partition); + metadata.put( + partition, + new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, beginningOffset, endOffset) Review comment: Just making the MockConsumer return its metadata as well. There are some extra checks in here because the mock doesn't guarantee a valid position or beginning/end offsets while calling poll, whereas the real consumer does. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchedRecords<K, V> { + private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private final Map<TopicPartition, FetchMetadata> metadata; + + public static final class FetchMetadata { + + private final long receivedTimestamp; + private final long beginningOffset; + private final SubscriptionState.FetchPosition position; + private final long endOffset; + + public FetchMetadata(final long receivedTimestamp, + final SubscriptionState.FetchPosition position, + final long beginningOffset, + final long endOffset) { + this.receivedTimestamp = receivedTimestamp; + this.position = position; + this.beginningOffset = beginningOffset; + this.endOffset = endOffset; + } + + public long receivedTimestamp() { + return receivedTimestamp; + } + + public SubscriptionState.FetchPosition position() { + return position; + } + + public long beginningOffset() { + return beginningOffset; + } + + public long endOffset() { + return endOffset; + } + } + + public FetchedRecords() { + records = new HashMap<>(); + metadata = new HashMap<>(); + } + + public void addRecords(final TopicPartition topicPartition, final List<ConsumerRecord<K, V>> records) { Review comment: moved this logic from the Fetcher, since it seemed natural to internalize it here. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -637,20 +637,42 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { + final long receivedTimestamp = nextInLineFetch.receivedTimestamp; + + final long startOffset = nextInLineFetch.partitionData.logStartOffset(); + + // read_uncommitted: + //that is, the offset of the last successfully replicated message plus one + final long hwm = nextInLineFetch.partitionData.highWatermark(); + // read_committed: + //the minimum of the high watermark and the smallest offset of any open transaction + final long lso = nextInLineFetch.partitionData.lastStableOffset(); + + // end offset is: + final long endOffset = + isolationLevel == IsolationLevel.READ_UNCOMMITTED ? hwm : lso; + + final FetchPosition fetchPosition = subscriptions.position(partition); + + final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); + + if (fetchMetadata == null + || !fetchMetadata.position().offsetEpoch.isPresent() + || fetchPosition.offsetEpoch.isPresent() + && fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { + + fetched.addMetadata( + partition, + new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, endOffset) + ); Review comment: First we capture the metadata ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -596,7 +603,7 @@ public void testFetchProgressWithMissingPartitionPosition() { initMetadata(client, Collections.singletonMap(topic, 2)); KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata); - consumer.assign(Arrays.asList(tp0, tp1)); + consumer.assign(asList(tp0, tp1)); Review comment: A side effect of adding static imports. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -1045,6 +1052,7 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() { ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO); assertEquals(0, records.count()); + assertThat(records.metadata(), equalTo(emptyMap())); Review comment: I dropped in a couple of extra assertions where it seemed appropriate. There are also some new tests farther down to really exercise the new code. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -319,7 +319,7 @@ public void onSuccess(ClientResponse resp) { short responseVersion = resp.requestHeader().apiVersion(); completedFetches.add(new CompletedFetch(partition, partitionData, - metricAggregator, batches, fetchOffset, responseVersion)); + metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs())); Review comment: Capture the received timestamp. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2374,10 +2504,20 @@ private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> partit } private static class FetchInfo { + long logFirstOffset; + long logLastOffset; Review comment: I considered inferring these from the sequence of other mock interactions, but it seemed more sensible to just have a way to specify them explicitly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org