lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1951212890
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -475,21 +479,25 @@ public void testSyncCommitEventWithException() { doReturn(future).when(commitRequestManager).commitSync(any(), anyLong()); processor.process(event); - verify(commitRequestManager).commitSync(Optional.empty(), 12345); + verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345); + assertTrue(event.offsetsReady.isDone()); assertFutureThrows(event.future(), IllegalStateException.class); } @ParameterizedTest @MethodSource("offsetsGenerator") public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { AsyncCommitEvent event = new AsyncCommitEvent(offsets); + Map<TopicPartition, OffsetAndMetadata> actualOffsets = offsets.orElse(Collections.emptyMap()); setupProcessor(true); - doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets); + doReturn(CompletableFuture.completedFuture(actualOffsets)).when(commitRequestManager).commitAsync(actualOffsets); + doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed(); processor.process(event); - verify(commitRequestManager).commitAsync(offsets); + verify(commitRequestManager).commitAsync(actualOffsets); Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get()); + assertTrue(event.offsetsReady.isDone()); Review Comment: should we move this assertion up right after the `commitAsync`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -206,8 +206,13 @@ public void process(ApplicationEvent event) { } private void process(final PollEvent event) { + // To ensure certain positions before reconciliation, we only trigger a full process of reconciling by PollEvent Review Comment: The "ensure positions" part doesn't read very well I would say? Maybe we can simply say that we trigger a reconciliation that can safely commit offsets if needed to revoke partitions, as we're processing the PollEvent before any new fetching starts in the app thread. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -846,6 +845,11 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final C } applicationEventHandler.add(commitEvent); + + // Wait for offsets to be ready if none were explicitly specified + // This blocks until the background thread retrieves allConsumed positions to commit Review Comment: ```suggestion // This blocks until the background thread retrieves allConsumed positions to commit if none were explicitly specified. ``` (only because both statements seem to say almost the same) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java: ########## @@ -117,6 +121,12 @@ public String rackId() { return rackId; } + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + maybeReconcile(true); + return NetworkClientDelegate.PollResult.EMPTY; + } + Review Comment: do we still need to overwrite this? now the maybeReconcile will only shortcircuit on the canCommit only if it has auto-commit enabled, so I expect that the share consumer reconciliation will work as it used to with the default poll implementation? (will have canCommit=false but with autoCommit disabled, so it should continue with the reconciliation) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java: ########## @@ -83,14 +83,16 @@ public ShareMembershipManager(LogContext logContext, SubscriptionState subscriptions, ConsumerMetadata metadata, Time time, - Metrics metrics) { + Metrics metrics, + boolean autoCommitEnabled) { Review Comment: uhm I would say we shouldn't expose this in the ShareConsumer (it doesn't do auto commits), and should simply assume it false on ln 113 below, makes sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java: ########## @@ -100,13 +102,15 @@ public ShareMembershipManager(LogContext logContext, SubscriptionState subscriptions, ConsumerMetadata metadata, Time time, - ShareRebalanceMetricsManager metricsManager) { + ShareRebalanceMetricsManager metricsManager, + boolean autoCommitEnabled) { Review Comment: ditto, remove? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -388,22 +387,21 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState * exceptionally depending on the response. If the request fails with a retriable error, the * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { - Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed); - if (commitOffsets.isEmpty()) { + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { + if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(Map.of()); } Review Comment: We had Optional before because we had two "empty" cases to differentiate: 1. empty Optional offsets passed from the app -> meaning we should use all consumed 2. empty offsets map after 1 -> meaning we don't have offsets to commit in the end, so can early-return. Now the allConsumed logic (1) is higher up in the call stack, so the map seems enough here. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() { assertPoll(0, commitRequestManager); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.maybeAutoCommitAsync(); Review Comment: yes please, good idea. I expect we should reuse that when auto-committing from the PollEvent, and when processing the assign partitions call (AssignmentChangeEvent) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -446,14 +447,17 @@ public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() { @MethodSource("offsetsGenerator") public void testSyncCommitEvent(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { SyncCommitEvent event = new SyncCommitEvent(offsets, 12345); + Map<TopicPartition, OffsetAndMetadata> actualOffsets = offsets.orElse(Collections.emptyMap()); setupProcessor(true); - doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets, 12345); + doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(actualOffsets, 12345); + doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed(); processor.process(event); - verify(commitRequestManager).commitSync(offsets, 12345); + verify(commitRequestManager).commitSync(actualOffsets, 12345); Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get()); - assertEquals(offsets.orElse(Map.of()), committedOffsets); + assertTrue(event.offsetsReady.isDone()); Review Comment: looks enough to me since I see you included it for all the combinations (commit sync/async, with/without offsets). I would only suggest we move the assertion up, right after the calls to `commitSync` (it should complete without waiting for the commit resp) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -267,7 +268,7 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCom public void maybeAutoCommitAsync() { if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { OffsetCommitRequestState requestState = createOffsetCommitRequest( - subscriptions.allConsumed(), + latestPartitionOffsets, Review Comment: exactly. We're saying that the regular auto-commit and auto-commit to revoke can only be triggered from the PollEvent. I honestly don't see any other way to ensure the correctness of those commit operations (the only way to be sure that the records have been returned is to wait for the beginning of the next poll, which is what the classic does) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org