junrao commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1945291682


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        future.whenComplete((value, exception) -> {
+            if (exception != null)
+                event.future().completeExceptionally(exception);
+            else {
+                
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+                    
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));

Review Comment:
   It seems that we still have an issue with `commitAsync()` . When the user 
calls `commitAsync()` with no offsets, we append a `SyncCommitEvent` to the 
background thread without waiting for the processing to complete. The 
background will eventually call `subscriptions.allConsumed` to grab the offsets 
to commit. However, since this is done while the application thread is running, 
it's possible that some of the committed offsets have not been actually fully 
consumed by the application.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to