frankvicky commented on PR #18737: URL: https://github.com/apache/kafka/pull/18737#issuecomment-2621642766
Hi @lianetm I am currently moving the invocation from the background thread to the application thread. This ensures there will be no gap between committed offsets and actually consumed records. However, this change raises some considerations. If we want to ensure `SubscriptionState#allConsumed()` is invoked by the application thread, we need to rely on events or event processor helper methods to deliver the offsets. One last consideration is regarding `AsyncKafkaConsumer#commitSync(final Duration timeout)`. Currently, it always passes `Optional#empty()` as an argument, which causes the background thread to invoke `SubscriptionState#allConsumed()`. Since this patch prevents invoking `SubscriptionState#allConsumed()` from the background thread, I think we should update `AsyncKafkaConsumer#commitSync(final Duration timeout)` to pass `SubscriptionState#allConsumed()` as an argument instead. WDYT? https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L390 https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1448 https://github.com/apache/kafka/blob/f960e2064778a441a3a955408e45472ff9b51422/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org