lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1937527654
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + final CompletableFuture<Boolean> b = event.future(); + future.whenComplete((BiConsumer<? super Boolean, ? super Throwable>) (value, exception) -> { Review Comment: do we really need to cast the `(value, exception)` here? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -402,22 +399,20 @@ public void testCommitAsyncWithEmptyOffsets() { (short) 1, Errors.NONE))); - verify(subscriptionState).allConsumed(); verify(metadata).updateLastSeenEpochIfNewer(tp, 1); assertTrue(future.isDone()); Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get()); assertEquals(offsets, commitOffsets); } @Test - public void testCommitAsyncWithEmptyAllConsumedOffsets() { + public void testCommitAsyncWithEmptyLatestPartitionOffsetsOffsets() { Review Comment: I would say the test name still applies as it was (just that allConsumed is taken when we know it has been returned). The new one seems a bit confusing ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -389,7 +388,7 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState * future will be completed with a {@link RetriableCommitFailedException}. */ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { - Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed); + Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(this::latestPartitionOffsets); Review Comment: uhm I don't think we should change here, and it's actually dangerous I believe. This is my reasoning (please correct me at any point): we have 2 kinds of commit operations in this manager: 1. commits triggered automatically in the background (commit before rebalance and auto-commit on the interval) 2. commits triggered by API calls (`commitSync` and `commitAsync`, which are only triggered by a consumer.commitSync/Async call or consumer.close. Note that these could be for specific offsets, or for allConsumed) My take is that with this PR we need to change only 1, which are the ones affected by the race condition with the fetch happening within a consumer poll iteration. Those commits that happen automatically cannot take the `allConsumed` from the subscription state because we could be in the middle of a consumer poll iteration in the app thread (with positions advanced but the records not returned yet). So agree with the changes to `maybeAutoCommitSyncBeforeRevocation` and `maybeAutoCommitAsync` to not use subscriptionState.allConsumed. But, the commits grouped in 2 (triggered by consumer API calls), can and should use the `allConsumed` from the `subscriptionState` I expect, as they happen outside of poll the loop, so first, they don't land in the race we're targeting, and most importantly, we cannot even ensure that the commitMgr latestPartitionOffsets has the positions returned when they are called (this is the dangerous part). Ex. single call to poll that returns 5 records + commitSync()/commitAsync() If that commit takes the `latestPartitionOffsets` from the commitReqMgr, wouldn't that be 0? the `latestPartitionOffsets` is only incremented on the next call to poll (if any), which makes sense, because that's the only time, when running a continuos poll, that we can certainly assume that the records have been returned (on the previous iteration). Makes sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -637,6 +636,14 @@ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAnd }); } + public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() { + return latestPartitionOffsets; + } + + public void setLatestPartitionOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) { + this.latestPartitionOffsets = Collections.unmodifiableMap(offsets); Review Comment: should we add a debug log here to know that we're updating the all consumed positions to be committed? (I expect it will be helpful to track the flow if needed) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + final CompletableFuture<Boolean> b = event.future(); Review Comment: is there a reason why we need this var? (vs using event.future directly to complete below) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -250,7 +251,10 @@ private void process(final SyncCommitEvent event) { try { CommitRequestManager manager = requestManagers.commitRequestManager.get(); - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(event.offsets(), event.deadlineMs()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync( + event.offsets().orElseGet(subscriptions::allConsumed), + event.deadlineMs() + ); Review Comment: uhm not sure, relates to comment above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org