lucasbru commented on code in PR #17414:
URL: https://github.com/apache/kafka/pull/17414#discussion_r1793530163


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -53,6 +63,13 @@ public List<ConsumerRecord<K, V>> records(TopicPartition 
partition) {
             return Collections.unmodifiableList(recs);
     }
 
+    /**
+     * Get just the next offsets that the consumer will consume
+     */
+    public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
+        return nextOffsets;

Review Comment:
   Other methods in this class return an unmodifiable collection, should we do 
the same?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -53,6 +63,13 @@ public List<ConsumerRecord<K, V>> records(TopicPartition 
partition) {
             return Collections.unmodifiableList(recs);
     }
 
+    /**
+     * Get just the next offsets that the consumer will consume

Review Comment:
   I think this comment needs to be more precise. Which topic partitions are 
included, all the partitions that are assigned to the consumer, all partitions 
that I fetched from in the last batch? I suppose it's the latter



##########
clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java:
##########
@@ -64,41 +64,11 @@ public void testSimpleMock() {
         assertFalse(iter.hasNext());
         final TopicPartition tp = new TopicPartition("test", 0);
         assertEquals(2L, consumer.position(tp));
+        assertEquals(new OffsetAndMetadata(2, Optional.empty(), ""), 
recs.nextOffsets().get(tp));
         consumer.commitSync();
         assertEquals(2L, 
consumer.committed(Collections.singleton(tp)).get(tp).offset());
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void testSimpleMockDeprecated() {

Review Comment:
   why did you remove this test?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java:
##########
@@ -125,6 +95,7 @@ public void shouldNotClearRecordsForPausedPartitions() {
         consumer.resume(testPartitionList);
         ConsumerRecords<String, String> recordsSecondPoll = 
consumer.poll(Duration.ofMillis(1));
         assertEquals(1, recordsSecondPoll.count());
+        assertEquals(new OffsetAndMetadata(1, Optional.empty(), ""), 
recordsSecondPoll.nextOffsets().get(new TopicPartition("test", 0)));

Review Comment:
   We may want to compare all elements of `recordsSecondPoll.nextOffsets`, or 
at least check the size of the collection.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -178,6 +179,7 @@ private ConsumerRecords<Integer, String> 
buildTopicTestRecords(int recordSize,
                                                                    int 
emptyPartitionIndex,
                                                                    
Collection<String> topics) {
         Map<TopicPartition, List<ConsumerRecord<Integer, String>>> 
partitionToRecords = new LinkedHashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> nextOffsets = new HashMap<>();

Review Comment:
   In `ConsumerRecordsTest`, you insert data but never check anything on it. 
Don't you want to verify something?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -21,23 +21,33 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+

Review Comment:
   extra new line



##########
clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java:
##########
@@ -64,41 +64,11 @@ public void testSimpleMock() {
         assertFalse(iter.hasNext());
         final TopicPartition tp = new TopicPartition("test", 0);
         assertEquals(2L, consumer.position(tp));
+        assertEquals(new OffsetAndMetadata(2, Optional.empty(), ""), 
recs.nextOffsets().get(tp));

Review Comment:
   We may want to compare all elements of `recs.nextOffsets`, or at least check 
the size of the collection.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java:
##########
@@ -126,9 +131,15 @@ public void testOnConsumeChain() {
         list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), 
filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME,
             0, 0, 1, 1, new RecordHeaders(), Optional.empty()));
         records.put(tp, list1);
+        nextOffsets.put(tp, new OffsetAndMetadata(1, Optional.empty(), ""));
+
         records.put(filterTopicPart1, list2);
+        nextOffsets.put(filterTopicPart1, new OffsetAndMetadata(1, 
Optional.empty(), ""));
+
         records.put(filterTopicPart2, list3);
-        ConsumerRecords<Integer, Integer> consumerRecords = new 
ConsumerRecords<>(records);
+        nextOffsets.put(filterTopicPart2, new OffsetAndMetadata(1, 
Optional.empty(), ""));
+
+        ConsumerRecords<Integer, Integer> consumerRecords = new 
ConsumerRecords<>(records, nextOffsets);
         ConsumerRecords<Integer, Integer> interceptedRecords = 
interceptors.onConsume(consumerRecords);
         assertEquals(1, interceptedRecords.count());
         assertTrue(interceptedRecords.partitions().contains(tp));

Review Comment:
   I think we need to assert here something about the result of nextOffsets 
right? That those topic partitions are filtered out?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -195,7 +196,7 @@ private Fetch<K, V> fetchRecords(final CompletedFetch 
nextInLineFetch, int maxRe
                     metricsManager.recordPartitionLead(tp, lead);
                 }
 
-                return Fetch.forPartition(tp, partRecords, positionAdvanced);
+                return Fetch.forPartition(tp, partRecords, positionAdvanced, 
new OffsetAndMetadata(nextInLineFetch.nextFetchOffset(), 
nextInLineFetch.lastEpoch(), ""));

Review Comment:
   We _definitely_ need to update / add tests for this change in 
`FetchCollectorTest`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -583,7 +583,7 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                 final ShareFetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
                     currentFetch = fetch;
-                    return new ConsumerRecords<>(fetch.records());
+                    return new ConsumerRecords<>(fetch.records(), 
Collections.emptyMap());

Review Comment:
   Trivial, but please test this in `ShareConsumerImplTest`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -480,7 +481,7 @@ public ConsumerRecords<byte[], byte[]> consume(int n, long 
maxDuration, Map<Stri
                     consumedRecords += r.size();
                 }
                 if (consumedRecords >= n) {
-                    return new ConsumerRecords<>(records);
+                    return new ConsumerRecords<>(records, rec.nextOffsets());

Review Comment:
   Not sure if this is correct. Aren't you dropping all the `nextOffsets` for 
the previous polls? Like you are aggregating `rec.records` in `records`, you 
should aggregate `rec.nextOffsets` somewhere.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java:
##########
@@ -91,7 +91,7 @@ public synchronized ConsumerRecords<K, V> poll(Duration 
timeout) {
         }
 
         records.clear();
-        return new ConsumerRecords<>(results);
+        return new ConsumerRecords<>(results, Collections.emptyMap());

Review Comment:
   Could you check the result of `nextOffsets` in `MockShareConsumerTest` ? 
Generally, try to accompany each change with a change in the unit tests, if 
possible.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java:
##########
@@ -95,6 +102,13 @@ public int numRecords() {
         return numRecords;
     }
 
+    /**
+     * @return the next offsets and metadata (last epochs is included)
+     */
+    public Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata() {
+        return nextOffsetAndMetadata;

Review Comment:
   `records` returns an unmodifiableMap. Should we do the same here?



##########
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##########
@@ -73,19 +73,24 @@ public ConsumerRecords<String, String> 
onConsume(ConsumerRecords<String, String>
         CLUSTER_ID_BEFORE_ON_CONSUME.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
 
         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = 
new HashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> nextOffsets = new HashMap<>();
+
         for (TopicPartition tp : records.partitions()) {
             List<ConsumerRecord<String, String>> lst = new ArrayList<>();
+            long nextOffset = 0;

Review Comment:
   Have you considered the case here where the records for the topic partition 
is empty? Won't you return "next offset 0" in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to