frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1938274308


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -389,7 +388,7 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
     public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(this::latestPartitionOffsets);

Review Comment:
   Makes sense. 
   For `commitSync()/commitAsync()` calls without arguments, we need a way to 
get the consumed offsets. 
   
   To ensure we always have the most up-to-date offset information at the time 
of commit, we should directly access `subscriptionState.allConsumed`. This 
gives us the current state rather than a potentially stale snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to