lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1946722032


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        future.whenComplete((value, exception) -> {
+            if (exception != null)
+                event.future().completeExceptionally(exception);
+            else {
+                
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+                    
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));

Review Comment:
   well I was thinking of it but done in the background, but you got me 
thinking, is it safe to simply get it in the app thread like your snippet? 
Seems so really (and it's simpler). When there is a call to `commitAsync` we 
can expect there is no active fetching or update positions going on, so we 
could just get the all consumed in the app thread right away. That will ensure 
that if fetching starts right after (which would be common on apps doing 
poll/commit in a loop), that async commit will not be affected by the next 
fetch (our initial issue here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to