junrao commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1945291682
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + future.whenComplete((value, exception) -> { + if (exception != null) + event.future().completeExceptionally(exception); + else { + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); Review Comment: When the user calls `commitAsync()` with no offsets, we append a `SyncCommitEvent` to the background thread without waiting for the processing to complete. The background will eventually call `subscriptions.allConsumed` to grab the offsets to commit. However, since this is done while the application thread is running, it's possible that some of the committed offsets have not been actually fully consumed by the application. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -637,6 +638,15 @@ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAnd }); } + public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() { Review Comment: This seems unused? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org