junrao commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1953524086
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -818,6 +826,8 @@ void maybeReconcile() { return; } + if (autoCommitEnabled && !canCommit) return; Review Comment: Hmm, this still seems problematic. The application thread adds a `PollEvent` without waiting for the processing to complete. When we get here with canCommit=true, we haven't called `markPendingRevocationToPauseFetching()` yet. This means that the application thread could still fetch a chunk of records from the to be revoked partitions without processing the records yet. Later, we will call `signalReconciliationStarted()`, which will commit with `subscriptions.allConsumed()`. However, we are not sure if those offsets are actually consumed. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -637,6 +634,11 @@ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAnd }); } + public void updateTimerAndMaybeCommit(final long currentTimeMs) { + updateAutoCommitTimer(currentTimeMs); + maybeAutoCommitAsync(); Review Comment: Hmm, still not sure if this fixes the auto offset commit issue. The application thread adds the PollEvent without waiting for the processing of the event to complete. So, we are not sure if the application is still polling new records when `subscriptions.allConsumed()` is called. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java: ########## @@ -29,6 +30,6 @@ public class SyncCommitEvent extends CommitEvent { public SyncCommitEvent(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, final long deadlineMs) { - super(Type.COMMIT_SYNC, offsets, deadlineMs); + super(Type.COMMIT_SYNC, offsets, deadlineMs, new CompletableFuture<>()); Review Comment: Could we create the future in the super class in stead of passing it in? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -792,7 +796,11 @@ private void transitionToStale() { * - There are topics that haven't been added to the current assignment yet, but all their topic IDs * are missing from the target assignment. */ - void maybeReconcile() { + public void maybeReconcile(boolean canCommit) { Review Comment: Could we add a javadoc for `canCommit`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org