lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1946722032
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + future.whenComplete((value, exception) -> { + if (exception != null) + event.future().completeExceptionally(exception); + else { + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); Review Comment: well I was thinking of it but done in the background, but you got me thinking, is it safe to simply get it in the app thread like your snippet? Seems so really (and it's simpler). When there is a call to `commitAsync` we can expect there is no active fetching or update positions going on, so we could just get the all consumed in the app thread right away. That will ensure that if fetching starts right after (which would be common on apps doing poll/commit in a loop), that async commit will not be affected by the next fetch (our initial issue here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org