junrao commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1938065222
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -324,7 +323,7 @@ public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long de CompletableFuture<Void> result = new CompletableFuture<>(); OffsetCommitRequestState requestState = - createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs); + createOffsetCommitRequest(latestPartitionOffsets, deadlineMs); Review Comment: There is a subtle issue here. AbstractMembershipManager does the following step when revoking a partiton. ``` // Mark partitions as pending revocation to stop fetching from the partitions (no new // fetches sent out, and no in-flight fetches responses processed). markPendingRevocationToPauseFetching(revokedPartitions); // Commit offsets if auto-commit enabled before reconciling a new assignment. Request will // be retried until it succeeds, fails with non-retriable error, or timer expires. CompletableFuture<Void> commitResult; commitResult = signalReconciliationStarted(); ``` The first step marks the revoked partition as pendingRevocation, which prevents the partition's data from being returned in future `consumer.poll()` calls. However, when we get here, it's possible that a batch of records have just been returned to the application thread before the first step, but those records haven't been processed yet. So latestPartitionOffsets is not up to date yet. We need to wait for the next `setLatestPartitionOffsets()` call to happen. At that point, we know any record returned to the application will have been processed and no more records can be given to the application. So, it's safe to commit the offset at that point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org