mjsax commented on code in PR #19188:
URL: https://github.com/apache/kafka/pull/19188#discussion_r2015184978


##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
+
         final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
+        final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new 
HashMap<>();
         final List<TopicPartition> toClear = new ArrayList<>();
-
+        long numPollRecords = 0L;
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : 
this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
                 final List<ConsumerRecord<K, V>> recs = entry.getValue();
+
+                List<ConsumerRecord<K, V>> remaining = new ArrayList<>();

Review Comment:
   Not sure why we need `remaining` (similar question as for `nextPoll`)? (cf 
other comments)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                                 rec.offset() + 1, rec.leaderEpoch(), 
leaderAndEpoch);
                         subscriptions.position(entry.getKey(), newPosition);
                         nextOffsetAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+                        numPollRecords++;
                     }
                 }
-                toClear.add(entry.getKey());
+
+                if (remaining.isEmpty()) {
+                    toClear.add(entry.getKey());
+                } else {
+                    nextPoll.put(entry.getKey(), remaining);
+                }
             }
         }
-
         toClear.forEach(records::remove);
+        records.putAll(nextPoll);

Review Comment:
   Is seems clumsy to add stuff back -- it seems much more straightforward, to 
remove individual record from from `this.records` one-by-one.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                                 rec.offset() + 1, rec.leaderEpoch(), 
leaderAndEpoch);
                         subscriptions.position(entry.getKey(), newPosition);
                         nextOffsetAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+                        numPollRecords++;
                     }
                 }
-                toClear.add(entry.getKey());

Review Comment:
   Should we remove `toClear` all together, and instead call `recs.remove(0)` 
(we might want to switch from `ArrayList` to `LinkedList`)? Plus add a check if 
a partition becomes empty and remove it entirely.
   
   If we cannot this this here (`ConcurrentModificationException`?) we could 
change `toClear` to type `Map<TopicPartition, Integer>` and just count how many 
record we did remove.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
+
         final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
+        final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new 
HashMap<>();

Review Comment:
   Not sure why we need `nextPoll`? (cf other comments)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
+

Review Comment:
   nit: avoid unnecessary diffs



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
+
         final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
+        final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new 
HashMap<>();
         final List<TopicPartition> toClear = new ArrayList<>();
-
+        long numPollRecords = 0L;
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : 
this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
                 final List<ConsumerRecord<K, V>> recs = entry.getValue();
+
+                List<ConsumerRecord<K, V>> remaining = new ArrayList<>();
                 for (final ConsumerRecord<K, V> rec : recs) {
+
+                    if (numPollRecords >= this.maxPollRecords) {
+                        remaining.add(rec);
+                        continue;

Review Comment:
   Why we `continue` here, instead of just breaking the `for` loop, and stop 
consuming from `this.records`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to