lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1951212890


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -475,21 +479,25 @@ public void testSyncCommitEventWithException() {
         doReturn(future).when(commitRequestManager).commitSync(any(), 
anyLong());
         processor.process(event);
 
-        verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+        verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
+        assertTrue(event.offsetsReady.isDone());
         assertFutureThrows(event.future(), IllegalStateException.class);
     }
 
     @ParameterizedTest
     @MethodSource("offsetsGenerator")
     public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
         AsyncCommitEvent event = new AsyncCommitEvent(offsets);
+        Map<TopicPartition, OffsetAndMetadata> actualOffsets = 
offsets.orElse(Collections.emptyMap());
 
         setupProcessor(true);
-        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitAsync(offsets);
+        
doReturn(CompletableFuture.completedFuture(actualOffsets)).when(commitRequestManager).commitAsync(actualOffsets);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
 
         processor.process(event);
-        verify(commitRequestManager).commitAsync(offsets);
+        verify(commitRequestManager).commitAsync(actualOffsets);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(event.offsetsReady.isDone());

Review Comment:
   should we move this assertion up right after the `commitAsync`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,8 +206,13 @@ public void process(ApplicationEvent event) {
     }
 
     private void process(final PollEvent event) {
+        // To ensure certain positions before reconciliation, we only trigger 
a full process of reconciling by PollEvent

Review Comment:
   The "ensure positions" part doesn't read very well I would say? 
   Maybe we can simply say that we trigger a reconciliation that can safely 
commit offsets if needed to revoke partitions, as we're processing the 
PollEvent before any new fetching starts in the app thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -846,6 +845,11 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> commit(final C
         }
 
         applicationEventHandler.add(commitEvent);
+
+        // Wait for offsets to be ready if none were explicitly specified
+        // This blocks until the background thread retrieves allConsumed 
positions to commit

Review Comment:
   ```suggestion
           // This blocks until the background thread retrieves allConsumed 
positions to commit if none were explicitly specified.
   ```
   (only because both statements seem to say almost the same) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java:
##########
@@ -117,6 +121,12 @@ public String rackId() {
         return rackId;
     }
 
+    @Override
+    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+        maybeReconcile(true);
+        return NetworkClientDelegate.PollResult.EMPTY;
+    }
+

Review Comment:
   do we still need to overwrite this? now the maybeReconcile will only 
shortcircuit on the canCommit only if it has auto-commit enabled, so I expect 
that the share consumer reconciliation will work as it used to with the default 
poll implementation? (will have canCommit=false but with autoCommit disabled, 
so it should continue with the reconciliation)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java:
##########
@@ -83,14 +83,16 @@ public ShareMembershipManager(LogContext logContext,
                                   SubscriptionState subscriptions,
                                   ConsumerMetadata metadata,
                                   Time time,
-                                  Metrics metrics) {
+                                  Metrics metrics,
+                                  boolean autoCommitEnabled) {

Review Comment:
   uhm I would say we shouldn't expose this in the ShareConsumer (it doesn't do 
auto commits), and should simply assume it false on ln 113 below, makes sense?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java:
##########
@@ -100,13 +102,15 @@ public ShareMembershipManager(LogContext logContext,
                            SubscriptionState subscriptions,
                            ConsumerMetadata metadata,
                            Time time,
-                           ShareRebalanceMetricsManager metricsManager) {
+                           ShareRebalanceMetricsManager metricsManager,
+                           boolean autoCommitEnabled) {

Review Comment:
   ditto, remove?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -388,22 +387,21 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
      * exceptionally depending on the response. If the request fails with a 
retriable error, the
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
-        if (commitOffsets.isEmpty()) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(Map.of());
         }

Review Comment:
   We had Optional before because we had two "empty" cases to differentiate:
   1. empty Optional offsets passed from the app -> meaning we should use all 
consumed
   2. empty offsets map after 1 -> meaning we don't have offsets to commit in 
the end, so can early-return.
   
   Now the allConsumed logic (1) is higher up in the call stack, so the map 
seems enough here. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() {
         assertPoll(0, commitRequestManager);
 
         commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.maybeAutoCommitAsync();

Review Comment:
   yes please, good idea. I expect we should reuse that when auto-committing 
from the PollEvent, and when processing the assign partitions call 
(AssignmentChangeEvent) 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -446,14 +447,17 @@ public void 
testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
     @MethodSource("offsetsGenerator")
     public void testSyncCommitEvent(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
         SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+        Map<TopicPartition, OffsetAndMetadata> actualOffsets = 
offsets.orElse(Collections.emptyMap());
 
         setupProcessor(true);
-        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
 12345);
+        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(actualOffsets,
 12345);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
 
         processor.process(event);
-        verify(commitRequestManager).commitSync(offsets, 12345);
+        verify(commitRequestManager).commitSync(actualOffsets, 12345);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
-        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+        assertTrue(event.offsetsReady.isDone());

Review Comment:
   looks enough to me since I see you included it for all the combinations 
(commit sync/async, with/without offsets).
   
   I would only suggest we move the assertion up, right after the calls to 
`commitSync` (it should complete without waiting for the commit resp)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -267,7 +268,7 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> requestAutoCom
     public void maybeAutoCommitAsync() {
         if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
             OffsetCommitRequestState requestState = createOffsetCommitRequest(
-                subscriptions.allConsumed(),
+                latestPartitionOffsets,

Review Comment:
   exactly. We're saying that the regular auto-commit and auto-commit to revoke 
can only be triggered from the PollEvent. I honestly don't see any other way to 
ensure the correctness of those commit operations (the only way to be sure that 
the records have been returned is to wait for the beginning of the next poll, 
which is what the classic does)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to