lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1937527654


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        final CompletableFuture<Boolean> b = event.future();
+        future.whenComplete((BiConsumer<? super Boolean, ? super Throwable>) 
(value, exception) -> {

Review Comment:
   do we really need to cast the `(value, exception)` here?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -402,22 +399,20 @@ public void testCommitAsyncWithEmptyOffsets() {
             (short) 1,
             Errors.NONE)));
 
-        verify(subscriptionState).allConsumed();
         verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
         assertTrue(future.isDone());
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
         assertEquals(offsets, commitOffsets);
     }
 
     @Test
-    public void testCommitAsyncWithEmptyAllConsumedOffsets() {
+    public void testCommitAsyncWithEmptyLatestPartitionOffsetsOffsets() {

Review Comment:
   I would say the test name still applies as it was (just that allConsumed is 
taken when we know it has been returned). The new one seems a bit confusing



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -389,7 +388,7 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
     public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(this::latestPartitionOffsets);

Review Comment:
   uhm I don't think we should change here, and it's actually dangerous I 
believe. This is my reasoning (please correct me at any point): we have 2 kinds 
of commit operations in this manager:
   
   1. commits triggered automatically in the background (commit before 
rebalance and auto-commit on the interval)
   2. commits triggered by API calls (`commitSync` and `commitAsync`, which are 
only triggered by a consumer.commitSync/Async call or consumer.close. Note that 
these could be for specific offsets, or for allConsumed)
   
   My take is that with this PR we need to change only 1, which are the ones 
affected by the race condition with the fetch happening within a consumer poll 
iteration. Those commits that happen automatically cannot take the 
`allConsumed` from the subscription state because we could be in the middle of 
a consumer poll iteration in the app thread (with positions advanced but the 
records not returned yet). So agree with the changes to 
`maybeAutoCommitSyncBeforeRevocation` and `maybeAutoCommitAsync` to not use 
subscriptionState.allConsumed.
   
   But, the commits grouped in 2 (triggered by consumer API calls), can and 
should use the `allConsumed` from the `subscriptionState` I expect, as they 
happen outside of poll the loop, so first, they don't land in the race we're 
targeting, and most importantly, we cannot even ensure that the commitMgr 
latestPartitionOffsets has the positions returned when they are called (this is 
the dangerous part). 
   
   Ex. single call to poll that returns 5 records + commitSync()/commitAsync() 
   If that commit takes the `latestPartitionOffsets` from the commitReqMgr, 
wouldn't that be 0? the `latestPartitionOffsets` is only incremented on the 
next call to poll (if any), which makes sense, because that's the only time, 
when running a continuos poll, that we can certainly assume that the records 
have been returned (on the previous iteration). Makes sense? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +636,14 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {
+        return latestPartitionOffsets;
+    }
+
+    public void setLatestPartitionOffsets(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+        this.latestPartitionOffsets = Collections.unmodifiableMap(offsets);

Review Comment:
   should we add a debug log here to know that we're updating the all consumed 
positions to be committed? (I expect it will be helpful to track the flow if 
needed)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
      */
     private void process(final CheckAndUpdatePositionsEvent event) {
         CompletableFuture<Boolean> future = 
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        final CompletableFuture<Boolean> b = event.future();

Review Comment:
   is there a reason why we need this var? (vs using event.future directly to 
complete below)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -250,7 +251,10 @@ private void process(final SyncCommitEvent event) {
 
         try {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(event.offsets(), event.deadlineMs());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(
+                event.offsets().orElseGet(subscriptions::allConsumed),
+                event.deadlineMs()
+            );

Review Comment:
   uhm not sure, relates to comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to