junrao commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1945291682
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + future.whenComplete((value, exception) -> { + if (exception != null) + event.future().completeExceptionally(exception); + else { + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); Review Comment: It seems that we still have an issue with `commitAsync()` . When the user calls `commitAsync()` with no offsets, we append a `SyncCommitEvent` to the background thread without waiting for the processing to complete. The background will eventually call `subscriptions.allConsumed` to grab the offsets to commit. However, since this is done while the application thread is running, it's possible that some of the committed offsets have not been actually fully consumed by the application. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org