lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1947077467


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        future.whenComplete((value, exception) -> {
+            if (exception != null)
+                event.future().completeExceptionally(exception);
+            else {
+                
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+                    
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));

Review Comment:
   could you elaborate on the callback alternative you mentioned? Seems 
interesting if we could end with just adding a future  in the 
`AsyncCommitEvent` to know when allConsumedRetrieved. The app thread 
commitAsync would block on that right after sending the commitAsync to the 
background, not waiting for any request, just for the confirmation that the 
async got the allConsumed that need to be committed. 
   
   It's a similar approach we have in other api calls, ex. seek that blocks 
until it has confirmation that the action on the subscription state happened in 
the background, no requests. Diff here is that you need to know 2 things about 
the async event right? (when it read the allConsumed, and when it got a 
response for the request)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to