vvcephei commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r552938808



##########
File path: build.gradle
##########
@@ -1070,6 +1070,7 @@ project(':clients') {
     testCompile libs.junitJupiterApi
     testCompile libs.junitVintageEngine
     testCompile libs.mockitoCore
+    testCompile libs.hamcrest

Review comment:
       To get access to more Matchers

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
##########
@@ -32,14 +34,99 @@
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-
-    @SuppressWarnings("unchecked")
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.EMPTY_MAP);
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, Metadata> metadata;
+
+    public static final class Metadata {

Review comment:
       I made this a static inner class, since "metadata" is such an abstract 
concept. This way, the scope is clear.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -229,6 +244,7 @@ public synchronized void addRecord(ConsumerRecord<K, V> 
record) {
             throw new IllegalStateException("Cannot add records for a 
partition that is not assigned to the consumer");
         List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k 
-> new ArrayList<>());
         recs.add(record);
+        endOffsets.compute(tp, (ignore, offset) -> offset == null ? 
record.offset() : Math.max(offset, record.offset()));

Review comment:
       Just adding this computation in for convenience.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +637,42 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {
+                        final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+
+                        final long startOffset = 
nextInLineFetch.partitionData.logStartOffset();
+
+                        // read_uncommitted:
+                        //that is, the offset of the last successfully 
replicated message plus one
+                        final long hwm = 
nextInLineFetch.partitionData.highWatermark();
+                        // read_committed:
+                        //the minimum of the high watermark and the smallest 
offset of any open transaction
+                        final long lso = 
nextInLineFetch.partitionData.lastStableOffset();
+
+                        // end offset is:
+                        final long endOffset =
+                            isolationLevel == IsolationLevel.READ_UNCOMMITTED 
? hwm : lso;
+
+                        final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+                        final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+
+                        if (fetchMetadata == null
+                            || 
!fetchMetadata.position().offsetEpoch.isPresent()
+                            || fetchPosition.offsetEpoch.isPresent()
+                            && fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {
+
+                            fetched.addMetadata(
+                                partition,
+                                new 
FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, 
endOffset)
+                            );
                         }
+                    }
+
+                    if (!records.isEmpty()) {
+                        fetched.addRecords(partition, records);
                         recordsRemaining -= records.size();
                     }

Review comment:
       Then, we get the records (note the main logic is now internal to 
FetchedRecords).

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -524,7 +524,7 @@ public void testParseCorruptedRecord() throws Exception {
         consumerClient.poll(time.timer(0));
 
         // the first fetchedRecords() should return the first valid message
-        assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
+        assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size());

Review comment:
       Just a consequence of moving the records inside of a response struct.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
##########
@@ -32,14 +34,99 @@
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-
-    @SuppressWarnings("unchecked")
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.EMPTY_MAP);
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, Metadata> metadata;
+
+    public static final class Metadata {
+
+        private final long receivedTimestamp;
+        private final long position;
+        private final long beginningOffset;
+        private final long endOffset;
+
+        public Metadata(final long receivedTimestamp,
+                        final long position,
+                        final long beginningOffset,
+                        final long endOffset) {
+            this.receivedTimestamp = receivedTimestamp;
+            this.position = position;
+            this.beginningOffset = beginningOffset;
+            this.endOffset = endOffset;
+        }
+
+        /**
+         * @return The timestamp of the broker response that contained this 
metadata
+         */
+        public long receivedTimestamp() {
+            return receivedTimestamp;
+        }
+
+        /**
+         * @return The next position the consumer will fetch
+         */
+        public long position() {
+            return position;
+        }
+
+        /**
+         * @return The lag between the next position to fetch and the current 
end of the partition
+         */
+        public long lag() {
+            return endOffset - position;
+        }
+
+        /**
+         * @return The current first offset in the partition.
+         */
+        public long beginningOffset() {
+            return beginningOffset;
+        }
+
+        /**
+         * @return The current last offset in the partition. The determination 
of the "last" offset
+         * depends on the Consumer's isolation level. Under 
"read_uncommitted," this is the last successfully
+         * replicated offset plus one. Under "read_committed," this is the 
minimum of the last successfully
+         * replicated offset plus one or the smallest offset of any open 
transaction.
+         */
+        public long endOffset() {
+            return endOffset;
+        }
+    }
+
+    private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final 
FetchedRecords<K, V> fetchedRecords) {

Review comment:
       This is where we translate from the internal result container to the 
public one. It could be moved to a utility class, but this seemed fine.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1268,12 +1269,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer 
timer, final boolean waitFo
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> 
pollForFetches(Timer timer) {
+    private FetchedRecords<K, V> pollForFetches(Timer timer) {

Review comment:
       Added an internal struct so that the Fetcher can also return the desired 
metadata.

##########
File path: checkstyle/suppressions.xml
##########
@@ -100,7 +100,10 @@
               
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
 
     <suppress checks="NPathComplexity"
-              
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
+              
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="MockConsumer"/>

Review comment:
       I've added a couple more branches to MockConsumer, which pushed it over 
the line.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -218,7 +218,22 @@ public synchronized void unsubscribe() {
         }
 
         toClear.forEach(p -> this.records.remove(p));
-        return new ConsumerRecords<>(results);
+
+        final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new 
HashMap<>();
+        for (final TopicPartition partition : 
subscriptions.assignedPartitions()) {
+            if (subscriptions.hasValidPosition(partition) && 
beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) {
+                final SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+                final long offset = position.offset;
+                final long beginningOffset = beginningOffsets.get(partition);
+                final long endOffset = endOffsets.get(partition);
+                metadata.put(
+                    partition,
+                    new ConsumerRecords.Metadata(System.currentTimeMillis(), 
offset, beginningOffset, endOffset)

Review comment:
       Just making the MockConsumer return its metadata as well. There are some 
extra checks in here because the mock doesn't guarantee a valid position or 
beginning/end offsets while calling poll, whereas the real consumer does.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchedRecords<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, FetchMetadata> metadata;
+
+    public static final class FetchMetadata {
+
+        private final long receivedTimestamp;
+        private final long beginningOffset;
+        private final SubscriptionState.FetchPosition position;
+        private final long endOffset;
+
+        public FetchMetadata(final long receivedTimestamp,
+                             final SubscriptionState.FetchPosition position,
+                             final long beginningOffset,
+                             final long endOffset) {
+            this.receivedTimestamp = receivedTimestamp;
+            this.position = position;
+            this.beginningOffset = beginningOffset;
+            this.endOffset = endOffset;
+        }
+
+        public long receivedTimestamp() {
+            return receivedTimestamp;
+        }
+
+        public SubscriptionState.FetchPosition position() {
+            return position;
+        }
+
+        public long beginningOffset() {
+            return beginningOffset;
+        }
+
+        public long endOffset() {
+            return endOffset;
+        }
+    }
+
+    public FetchedRecords() {
+        records = new HashMap<>();
+        metadata = new HashMap<>();
+    }
+
+    public void addRecords(final TopicPartition topicPartition, final 
List<ConsumerRecord<K, V>> records) {

Review comment:
       moved this logic from the Fetcher, since it seemed natural to 
internalize it here.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +637,42 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {
+                        final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+
+                        final long startOffset = 
nextInLineFetch.partitionData.logStartOffset();
+
+                        // read_uncommitted:
+                        //that is, the offset of the last successfully 
replicated message plus one
+                        final long hwm = 
nextInLineFetch.partitionData.highWatermark();
+                        // read_committed:
+                        //the minimum of the high watermark and the smallest 
offset of any open transaction
+                        final long lso = 
nextInLineFetch.partitionData.lastStableOffset();
+
+                        // end offset is:
+                        final long endOffset =
+                            isolationLevel == IsolationLevel.READ_UNCOMMITTED 
? hwm : lso;
+
+                        final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+                        final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+
+                        if (fetchMetadata == null
+                            || 
!fetchMetadata.position().offsetEpoch.isPresent()
+                            || fetchPosition.offsetEpoch.isPresent()
+                            && fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {
+
+                            fetched.addMetadata(
+                                partition,
+                                new 
FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, 
endOffset)
+                            );

Review comment:
       First we capture the metadata

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -596,7 +603,7 @@ public void testFetchProgressWithMissingPartitionPosition() 
{
         initMetadata(client, Collections.singletonMap(topic, 2));
 
         KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, 
client, subscription, metadata);
-        consumer.assign(Arrays.asList(tp0, tp1));
+        consumer.assign(asList(tp0, tp1));

Review comment:
       A side effect of adding static imports.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1045,6 +1052,7 @@ public void 
fetchResponseWithUnexpectedPartitionIsIgnored() {
 
         ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
         assertEquals(0, records.count());
+        assertThat(records.metadata(), equalTo(emptyMap()));

Review comment:
       I dropped in a couple of extra assertions where it seemed appropriate. 
There are also some new tests farther down to really exercise the new code.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -319,7 +319,7 @@ public void onSuccess(ClientResponse resp) {
                                     short responseVersion = 
resp.requestHeader().apiVersion();
 
                                     completedFetches.add(new 
CompletedFetch(partition, partitionData,
-                                            metricAggregator, batches, 
fetchOffset, responseVersion));
+                                            metricAggregator, batches, 
fetchOffset, responseVersion, resp.receivedTimeMs()));

Review comment:
       Capture the received timestamp.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2374,10 +2504,20 @@ private ListOffsetsResponse 
listOffsetsResponse(Map<TopicPartition, Long> partit
     }
 
     private static class FetchInfo {
+        long logFirstOffset;
+        long logLastOffset;

Review comment:
       I considered inferring these from the sequence of other mock 
interactions, but it seemed more sensible to just have a way to specify them 
explicitly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to