junrao commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1938065222


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -324,7 +323,7 @@ public CompletableFuture<Void> 
maybeAutoCommitSyncBeforeRevocation(final long de
 
         CompletableFuture<Void> result = new CompletableFuture<>();
         OffsetCommitRequestState requestState =
-            createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
+            createOffsetCommitRequest(latestPartitionOffsets, deadlineMs);

Review Comment:
   There is a subtle issue here. AbstractMembershipManager does the following 
step when revoking a partiton.
   
   ```
           // Mark partitions as pending revocation to stop fetching from the 
partitions (no new
           // fetches sent out, and no in-flight fetches responses processed).
           markPendingRevocationToPauseFetching(revokedPartitions);
   
           // Commit offsets if auto-commit enabled before reconciling a new 
assignment. Request will
           // be retried until it succeeds, fails with non-retriable error, or 
timer expires.
           CompletableFuture<Void> commitResult;
   
           commitResult = signalReconciliationStarted();
   ```
   The first step marks the revoked partition as pendingRevocation, which 
prevents the partition's data from being returned in future `consumer.poll()` 
calls. However, when we get here, it's possible that a batch of records have 
just been returned to the application thread before the first step, but those 
records haven't been processed yet. So latestPartitionOffsets is not up to date 
yet. We need to wait for the next `setLatestPartitionOffsets()` call to happen. 
At that point, we know any record returned to the application will have been 
processed and no more records can be given to the application. So, it's safe to 
commit the offset at that point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to