mjsax commented on code in PR #19188: URL: https://github.com/apache/kafka/pull/19188#discussion_r2015184978
########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { // update the consumed offset final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); + final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>(); + final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new HashMap<>(); final List<TopicPartition> toClear = new ArrayList<>(); - + long numPollRecords = 0L; for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) { if (!subscriptions.isPaused(entry.getKey())) { final List<ConsumerRecord<K, V>> recs = entry.getValue(); + + List<ConsumerRecord<K, V>> remaining = new ArrayList<>(); Review Comment: Not sure why we need `remaining` (similar question as for `nextPoll`)? (cf other comments) ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch); subscriptions.position(entry.getKey(), newPosition); nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), "")); + numPollRecords++; } } - toClear.add(entry.getKey()); + + if (remaining.isEmpty()) { + toClear.add(entry.getKey()); + } else { + nextPoll.put(entry.getKey(), remaining); + } } } - toClear.forEach(records::remove); + records.putAll(nextPoll); Review Comment: Is seems clumsy to add stuff back -- it seems much more straightforward, to remove individual record from from `this.records` one-by-one. ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch); subscriptions.position(entry.getKey(), newPosition); nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), "")); + numPollRecords++; } } - toClear.add(entry.getKey()); Review Comment: Should we remove `toClear` all together, and instead call `recs.remove(0)` (we might want to switch from `ArrayList` to `LinkedList`)? Plus add a check if a partition becomes empty and remove it entirely. If we cannot this this here (`ConcurrentModificationException`?) we could change `toClear` to type `Map<TopicPartition, Integer>` and just count how many record we did remove. ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { // update the consumed offset final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); + final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>(); + final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new HashMap<>(); Review Comment: Not sure why we need `nextPoll`? (cf other comments) ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { // update the consumed offset final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); + Review Comment: nit: avoid unnecessary diffs ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { // update the consumed offset final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); + final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>(); + final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new HashMap<>(); final List<TopicPartition> toClear = new ArrayList<>(); - + long numPollRecords = 0L; for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) { if (!subscriptions.isPaused(entry.getKey())) { final List<ConsumerRecord<K, V>> recs = entry.getValue(); + + List<ConsumerRecord<K, V>> remaining = new ArrayList<>(); for (final ConsumerRecord<K, V> rec : recs) { + + if (numPollRecords >= this.maxPollRecords) { + remaining.add(rec); + continue; Review Comment: Why we `continue` here, instead of just breaking the `for` loop, and stop consuming from `this.records`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org