junrao commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1945291682


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        future.whenComplete((value, exception) -> {
+            if (exception != null)
+                event.future().completeExceptionally(exception);
+            else {
+                
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+                    
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));

Review Comment:
   When the user calls `commitAsync()` with no offsets, we append a 
`SyncCommitEvent` to the background thread without waiting for the processing 
to complete. The background will eventually call `subscriptions.allConsumed` to 
grab the offsets to commit. However, since this is done while the application 
thread is running, it's possible that some of the committed offsets have not 
been actually fully consumed by the application.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +638,15 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {

Review Comment:
   This seems unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to