lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1941427147
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -324,7 +323,7 @@ public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long de CompletableFuture<Void> result = new CompletableFuture<>(); OffsetCommitRequestState requestState = - createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs); + createOffsetCommitRequest(latestPartitionOffsets, deadlineMs); Review Comment: if we get `subscriptions.allConsumed()` on `signalReconciliationStarted`, I'm afraid we could be retrieving positions that have been advanced in the app thread but not processed by the app yet? I believe this is what @junrao was referring to with: > We need to wait for the next setLatestPartitionOffsets() call to happen. At that point, we know any record returned to the application will have been processed and no more records can be given to the application. So, it's safe to commit the offset at that point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org