kirktrue commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1934523073


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -88,6 +87,7 @@ public class CommitRequestManager implements RequestManager, 
MemberStateListener
     private final boolean throwOnFetchStableOffsetUnsupported;
     final PendingRequests pendingRequests;
     private boolean closing = false;
+    private Map<TopicPartition, OffsetAndMetadata> allConsumed = Map.of();

Review Comment:
   I find the name a little confusing. (I know we get it from 
`SubscriptionState`.) What about `latestPartitionOffsets` or something? If 
nothing else, please add a comment to make it clear what this map contains.
   
   This might be a stylistic choice, but what's the advantage of this over 
using `Collections.emptyMap()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -744,7 +744,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             do {
 
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs(), subscriptions.allConsumed()));

Review Comment:
   We've done a lot of work to ensure the background thread "owns" the current 
subscription state. This seems to go against those efforts.
   
   Can we add a comment to this line that explains why we don't want to the 
background thread to "own" the set of consumed partitions? This would be 
helpful since later on during the poll process we _do_ implicitly let the 
background thread determine the partitions to fetch (rather than having the 
application thread pass in the partitions).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -203,7 +203,9 @@ public void process(ApplicationEvent event) {
 
     private void process(final PollEvent event) {
         if (requestManagers.commitRequestManager.isPresent()) {
-            requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
+            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+            commitRequestManager.updateAutoCommitTimer(event.pollTimeMs());
+            commitRequestManager.allConsumed(event.allConsumed());

Review Comment:
   Can we consolidate the auto-commit timer and the offsets map in a single 
method call that's more clearly named? Are there other places that call 
`updateAutoCommitTimer()` other than this path?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java:
##########
@@ -16,21 +16,32 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
 public class PollEvent extends ApplicationEvent {
 
     private final long pollTimeMs;
+    private final Map<TopicPartition, OffsetAndMetadata> allConsumed;
 
-    public PollEvent(final long pollTimeMs) {
+    public PollEvent(final long pollTimeMs, Map<TopicPartition, 
OffsetAndMetadata> allConsumed) {
         super(Type.POLL);
         this.pollTimeMs = pollTimeMs;
+        this.allConsumed = allConsumed;

Review Comment:
   Please wrap this in a call to `Collections.unmodifiableMap()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -635,6 +634,14 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
+        return allConsumed;
+    }
+
+    public void allConsumed(Map<TopicPartition, OffsetAndMetadata> 
allConsumed) {
+        this.allConsumed = allConsumed;
+    }
+

Review Comment:
   We should probably stick to the convention of using `set` for setters. Also, 
it would be prudent to make the given map immutable so that when other code 
invokes the getter they're not able to modify it. 
   
   ```suggestion
       public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {
           return latestPartitionOffsets;
       }
   
       public void setLatestPartitionOffsets(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
           this.latestPartitionOffsets = Collections.unmodifiableMap(offsets);
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -569,6 +571,8 @@ private void process(final SeekUnvalidatedEvent event) {
                 metadata.currentLeader(event.partition())
             );
             subscriptions.seekUnvalidated(event.partition(), newPosition);
+            
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+                commitRequestManager.allConsumed(subscriptions.allConsumed()));

Review Comment:
   Why is `allConsumed()` called here again, but this time with the data from 
`SubscriptionState.allConsumed()`. Please add some comments to explain. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to