lianetm commented on code in PR #20324:
URL: https://github.com/apache/kafka/pull/20324#discussion_r2288400825


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitOffsetsSharedState.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.consumer.Consumer;
+import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class stores shared state needed by both the application thread 
({@link AsyncKafkaConsumer}) and the
+ * background thread ({@link OffsetsRequestManager}) to determine if a costly 
call to check offsets can be skipped
+ * inside {@link Consumer#poll(Duration)}.
+ *
+ * <p/>
+ *
+ * This class compromises on the ideal of keeping the state only in the 
background thread. However, this class only
+ * relies on the {@link SubscriptionState} and {@link ConsumerMetadata} which 
are, unfortunately, already used
+ * sparingly in both the application and background threads. Both of those 
classes are heavily synchronized given
+ * their use by the {@link ClassicKafkaConsumer}, so their use in a 
multithreaded fashion is already established.
+ */
+public class CommitOffsetsSharedState implements MemberStateListener {

Review Comment:
   This component seems to be for `OffsetsState` (could be committed offsets or 
partition offsets), correct? If so, should we revise the name?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitOffsetsSharedState.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.consumer.Consumer;
+import 
org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class stores shared state needed by both the application thread 
({@link AsyncKafkaConsumer}) and the
+ * background thread ({@link OffsetsRequestManager}) to determine if a costly 
call to check offsets can be skipped
+ * inside {@link Consumer#poll(Duration)}.
+ *
+ * <p/>
+ *
+ * This class compromises on the ideal of keeping the state only in the 
background thread. However, this class only
+ * relies on the {@link SubscriptionState} and {@link ConsumerMetadata} which 
are, unfortunately, already used
+ * sparingly in both the application and background threads. Both of those 
classes are heavily synchronized given
+ * their use by the {@link ClassicKafkaConsumer}, so their use in a 
multithreaded fashion is already established.
+ */
+public class CommitOffsetsSharedState implements MemberStateListener {
+
+    /**
+     * To keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates.
+     */
+    private final AtomicBoolean cachedSubscriptionHasAllFetchPositions = new 
AtomicBoolean();
+
+    /**
+     * Exception that occurred while updating positions after the triggering 
event had already
+     * expired. It will be propagated and cleared on the next call to update 
fetch positions.
+     */
+    private final AtomicReference<Throwable> cachedUpdatePositionsException = 
new AtomicReference<>();
+    private final OffsetFetcherUtils offsetFetcherUtils;
+    private final SubscriptionState subscriptions;
+
+    CommitOffsetsSharedState(LogContext logContext,
+                             ConsumerMetadata metadata,
+                             SubscriptionState subscriptions,
+                             Time time,
+                             long retryBackoffMs,
+                             ApiVersions apiVersions) {
+        this.offsetFetcherUtils = new OffsetFetcherUtils(
+            logContext,
+            metadata,
+            subscriptions,
+            time,
+            retryBackoffMs,
+            apiVersions
+        );
+        this.subscriptions = subscriptions;
+    }
+
+    Throwable getAndClearCachedUpdatePositionsException() {
+        return cachedUpdatePositionsException.getAndSet(null);
+    }
+
+    void setCachedUpdatePositionsException(Throwable exception) {
+        cachedUpdatePositionsException.set(exception);
+    }
+
+    boolean subscriptionHasAllFetchPositions() {
+        return cachedSubscriptionHasAllFetchPositions.get();
+    }
+
+    void setSubscriptionHasAllFetchPositions(boolean value) {
+        cachedSubscriptionHasAllFetchPositions.set(value);
+    }
+
+    /**
+     * This method is used by {@code 
AsyncKafkaConsumer#updateFetchPositions()} to determine if it can skip
+     * the step of sending (and waiting for) a {@link 
CheckAndUpdatePositionsEvent}. {@code updateFetchPositions()}
+     * is in the critical path for the {@link Consumer#poll(Duration)}, and if 
the application thread can determine
+     * that it doesn't need to perform the {@link 
OffsetsRequestManager#updateFetchPositions(long)} call (via the
+     * {@link CheckAndUpdatePositionsEvent}), that is a big performance 
savings.
+     *
+     * <p/>
+     *
+     * This method performs similar checks to the start of {@link 
OffsetsRequestManager#updateFetchPositions(long)}:
+     *
+     * <ol>
+     *     <li>
+     *         Checks for previous exceptions during update positions
+     *         ({@code OffsetsRequestManager#cacheExceptionIfEventExpired()})
+     *     </li>
+     *     <li>
+     *         Checks that the previously cached version of {@link 
#cachedSubscriptionHasAllFetchPositions} is still
+     *         {@code true}. This covers a couple additional cases (like 
before first assignment), so although it's
+     *         not completely optimal, the fallback is we force the check 
which will block the application thread
+     *         while the background thread performs its checks which may 
result in the value being set back to
+     *         {@code true} before the application thread resumes.
+     *     </li>
+     *     <li>
+     *         Checks that there are no positions in the {@link 
SubscriptionState.FetchStates#AWAIT_VALIDATION}
+     *         state ({@link OffsetFetcherUtils#getPartitionsToValidate()})
+     *     </li>
+     *     <li>
+     *         Checks that all positions are in the {@link 
SubscriptionState.FetchStates#FETCHING} state
+     *         ({@link SubscriptionState#hasAllFetchPositions()})
+     *     </li>
+     * </ol>
+     *
+     * If the first check fails, an exception will be thrown. If any of the 
second, third, or fourth checks fail, this
+     * method will return {@code false}. Otherwise, this method will return 
{@code true}, which signals to the
+     * application thread that the {@link CheckAndUpdatePositionsEvent} can be 
skipped.
+     *
+     * @return true if all checks pass, false if either of the latter two 
checks fail
+     */
+    boolean canSkipUpdateFetchPositions() {
+        Throwable exception = getAndClearCachedUpdatePositionsException();
+
+        if (exception != null)
+            throw ConsumerUtils.maybeWrapAsKafkaException(exception);
+
+        // If the cached value is set and there are no partitions in the 
AWAIT_RESET, AWAIT_VALIDATION, or
+        // INITIALIZING states, it's ok to skip.
+        if (cachedSubscriptionHasAllFetchPositions.get() && 
offsetFetcherUtils.getPartitionsToValidate().isEmpty() && 
subscriptions.hasAllFetchPositions())

Review Comment:
   Why relying on the extra cache here 
(`cachedSubscriptionHasAllFetchPositions.get()`)? Wouldn't this mean that we 
could potentially reuse the cached value for hasAllFetchPositions across 
multiple poll iterations? 
   
   We only want to reuse it within the same poll iteration I expect (which is 
what both consumers do before this PR: within a single poll iteration, always 
check `subscriptions.hasAllFetchPositions` the first time it's needed, and then 
reuse the value to avoid checking it more than once in that same poll)  
   
   Also, I would expect this check is redundant: 
`offsetFetcherUtils.getPartitionsToValidate().isEmpty() && 
subscriptions.hasAllFetchPositions()` 
   (if there are partitions to validate, hasAllFetchPositions will be false, 
because the `AWAIT_VALIDATION` state `hasValidPosition` is false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to