frankvicky commented on PR #18737:
URL: https://github.com/apache/kafka/pull/18737#issuecomment-2621642766

   Hi @lianetm
   
   I am currently moving the invocation from the background thread to the 
application thread. This ensures there will be no gap between committed offsets 
and actually consumed records.
   
   However, this change raises some considerations. If we want to ensure 
`SubscriptionState#allConsumed()` is invoked by the application thread, we need 
to rely on events or event processor helper methods to deliver the offsets.
   
   One last consideration is regarding `AsyncKafkaConsumer#commitSync(final 
Duration timeout)`. Currently, it always passes `Optional#empty()` as an 
argument, which causes the background thread to invoke 
`SubscriptionState#allConsumed()`. Since this patch prevents invoking 
`SubscriptionState#allConsumed()` from the background thread, I think we should 
update `AsyncKafkaConsumer#commitSync(final Duration timeout)` to pass 
`SubscriptionState#allConsumed()` as an argument instead. WDYT?
   
   
https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L390
   
   
   
https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1448
   
https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to