lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1947077467
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + future.whenComplete((value, exception) -> { + if (exception != null) + event.future().completeExceptionally(exception); + else { + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); Review Comment: could you elaborate on the callback alternative you mentioned? Seems interesting if we could end with just adding a future in the `AsyncCommitEvent` to know when allConsumedRetrieved. The app thread commitAsync would block on that right after sending the commitAsync to the background, not waiting for any request, just for the confirmation that the async got the allConsumed that need to be committed. It's a similar approach we have in other api calls, ex. seek that blocks until it has confirmation that the action on the subscription state happened in the background, no requests. Diff here is that you need to know 2 things about the async event right? (when it read the allConsumed, and when it got a response for the request) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org