junrao commented on code in PR #17709:
URL: https://github.com/apache/kafka/pull/17709#discussion_r1833506455


##########
share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.storage.log.FetchParams;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * The ShareFetch class is used to store the fetch parameters for a share 
fetch request.
+ */
+public class ShareFetch {
+
+    /**
+     * The future that will be completed when the fetch is done.
+     */
+    private final CompletableFuture<Map<TopicIdPartition, PartitionData>> 
future;
+
+    /**
+     * The fetch parameters for the fetch request.
+     */
+    private final FetchParams fetchParams;
+    /**
+     * The group id of the share group that is fetching the records.
+     */
+    private final String groupId;
+    /**
+     * The member id of the share group that is fetching the records.
+     */
+    private final String memberId;
+    /**
+     * The maximum number of bytes that can be fetched for each partition.
+     */
+    private final Map<TopicIdPartition, Integer> partitionMaxBytes;
+    /**
+     * The maximum number of records that can be fetched for the request.
+     */
+    private final int maxFetchRecords;
+    /**
+     * The partitions that had an error during the fetch.
+     */
+    private volatile Map<TopicIdPartition, Throwable> erroneous;
+
+    public ShareFetch(
+        FetchParams fetchParams,
+        String groupId,
+        String memberId,
+        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+        Map<TopicIdPartition, Integer> partitionMaxBytes,
+        int maxFetchRecords
+    ) {
+        this.fetchParams = fetchParams;
+        this.groupId = groupId;
+        this.memberId = memberId;
+        this.future = future;
+        this.partitionMaxBytes = partitionMaxBytes;
+        this.maxFetchRecords = maxFetchRecords;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public Map<TopicIdPartition, Integer> partitionMaxBytes() {
+        return partitionMaxBytes;
+    }
+
+    public FetchParams fetchParams() {
+        return fetchParams;
+    }
+
+    public int maxFetchRecords() {
+        return maxFetchRecords;
+    }
+
+    /**
+     * Add an erroneous partition to the share fetch request. If the erroneous 
map is null, it will
+     * be created.
+     * <p>
+     * The method is synchronized to avoid concurrent modification of the 
erroneous map, as for
+     * some partitions the pending initialization can be on some threads and 
for other partitions
+     * share fetch request can be processed in purgatory.
+     *
+     * @param topicIdPartition The partition that had an error.
+     * @param throwable The error that occurred.
+     */
+    public synchronized void addErroneous(TopicIdPartition topicIdPartition, 
Throwable throwable) {
+        if (erroneous == null) {
+            erroneous = new HashMap<>();
+        }
+        erroneous.put(topicIdPartition, throwable);
+    }
+
+    /**
+     * Check if the share fetch request is completed.
+     * @return true if the request is completed, false otherwise.
+     */
+    public boolean isCompleted() {
+        return future.isDone();
+    }
+
+    /**
+     * Check if all the partitions in the request have errored.
+     * @return true if all the partitions in the request have errored, false 
otherwise.
+     */
+    public boolean isErrored() {
+        return erroneous != null && erroneous.size() == 
partitionMaxBytes().size();
+    }
+
+    /**
+     * May be complete the share fetch request with the given partition data. 
If the request is already completed,
+     * this method does nothing. If there are any erroneous partitions, they 
will be added to the response.
+     *
+     * @param partitionData The partition data to complete the fetch with.
+     */
+    public void maybeComplete(Map<TopicIdPartition, PartitionData> 
partitionData) {
+        if (isCompleted()) {
+            return;
+        }
+
+        Map<TopicIdPartition, PartitionData> response = new 
HashMap<>(partitionData);
+        // Add any erroneous partitions to the response.
+        addErroneousToResponse(response);
+        future.complete(response);
+    }
+
+    /**
+     * May be complete the share fetch request with the given exception for 
the topicIdPartitions.

Review Comment:
   May be => Maybe



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -247,13 +248,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    val shareFetchResponseData = shareFetchResponse.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
-    assertEquals(1, shareFetchResponseData.responses().size())
-    assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
-    assertEquals(3, 
shareFetchResponseData.responses().get(0).partitions().size())
+    // For the multi partition fetch request, the response may not be 
available in the first attempt
+    // as the share partitions might not be initialized yet. So, we retry 
until we get the response.
+    var responses = Seq[ShareFetchResponseData.PartitionData]()
+    TestUtils.waitUntilTrue(() => {
+      val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
+      if (partitionsCount > 0) {
+        assertEquals(1, shareFetchResponseData.responses().size())
+        assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
+        
shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => 
{
+          if (!partitionData.acquiredRecords().isEmpty) {
+            responses = responses :+ partitionData
+          }
+        })
+      }
+      responses.size == 3

Review Comment:
   Should we reset responses during retry? Ditto below.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -296,15 +296,15 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.Partition
                 }
             }
         }
-        return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+        return accumulatedSize >= shareFetch.fetchParams().minBytes;
     }
 
     private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
         Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
         LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
         // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
         // extend it to support other FetchIsolation types.
-        FetchIsolation isolationType = shareFetchData.fetchParams().isolation;
+        FetchIsolation isolationType = shareFetch.fetchParams().isolation;

Review Comment:
   `replicaManager.getPartitionOrException` above throws an exception. Should 
we handle that and add it to shareFetch.erroneous?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -658,25 +651,26 @@ private SharePartition 
getOrCreateSharePartition(SharePartitionKey sharePartitio
                 });
     }
 
-    private void maybeCompleteInitializationWithException(
+    private void handleInitializationException(
             SharePartitionKey sharePartitionKey,
-            CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+            ShareFetch shareFetch,
             Throwable throwable) {
         if (throwable instanceof LeaderNotAvailableException) {
             log.debug("The share partition with key {} is not initialized 
yet", sharePartitionKey);
-            // Do not process the fetch request for this partition as the 
leader is not initialized yet.
-            // The fetch request will be retried in the next poll.
-            // TODO: Add the request to delayed fetch purgatory.
+            // Skip any handling for this error as the share partition is 
still loading. The request

Review Comment:
   When do we get a LeaderNotAvailableException? My understanding is that the 
throwable is based on the error code from ReadShareGroupStateResponse and it 
doesn't seem to return LeaderNotAvailableException.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -565,74 +562,70 @@ private static String 
partitionsToLogString(Collection<TopicIdPartition> partiti
     }
 
     // Visible for testing.
-    void processShareFetch(ShareFetchData shareFetchData) {
-        if (shareFetchData.partitionMaxBytes().isEmpty()) {
+    void processShareFetch(ShareFetch shareFetch) {
+        if (shareFetch.partitionMaxBytes().isEmpty()) {
             // If there are no partitions to fetch then complete the future 
with an empty map.
-            shareFetchData.future().complete(Collections.emptyMap());
+            shareFetch.maybeComplete(Collections.emptyMap());
             return;
         }
 
-        // Initialize lazily, if required.
-        Map<TopicIdPartition, Throwable> erroneous = null;
         Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
-        for (TopicIdPartition topicIdPartition : 
shareFetchData.partitionMaxBytes().keySet()) {
+        for (TopicIdPartition topicIdPartition : 
shareFetch.partitionMaxBytes().keySet()) {
             SharePartitionKey sharePartitionKey = sharePartitionKey(
-                shareFetchData.groupId(),
+                shareFetch.groupId(),
                 topicIdPartition
             );
 
             SharePartition sharePartition;
             try {
                 sharePartition = getOrCreateSharePartition(sharePartitionKey);
             } catch (Exception e) {
-                // Complete the whole fetch request with an exception if there 
is an error processing.
-                // The exception currently can be thrown only if there is an 
error while initializing
-                // the share partition. But skip the processing for other 
share partitions in the request
-                // as this situation is not expected.
-                log.error("Error processing share fetch request", e);
-                if (erroneous == null) {
-                    erroneous = new HashMap<>();
-                }
-                erroneous.put(topicIdPartition, e);
+                log.debug("Error processing share fetch request", e);
+                shareFetch.addErroneous(topicIdPartition, e);
                 // Continue iteration for other partitions in the request.
                 continue;
             }
 
             // We add a key corresponding to each share partition in the 
request in the group so that when there are
             // acknowledgements/acquisition lock timeout etc., we have a way 
to perform checkAndComplete for all
             // such requests which are delayed because of lack of data to 
acquire for the share partition.
-            delayedShareFetchWatchKeys.add(new 
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), 
topicIdPartition.partition()));
+            DelayedShareFetchKey delayedShareFetchKey = new 
DelayedShareFetchGroupKey(shareFetch.groupId(),
+                topicIdPartition.topicId(), topicIdPartition.partition());
+            delayedShareFetchWatchKeys.add(delayedShareFetchKey);
             // We add a key corresponding to each topic partition in the 
request so that when the HWM is updated
             // for any topic partition, we have a way to perform 
checkAndComplete for all such requests which are
             // delayed because of lack of data to acquire for the topic 
partition.
             delayedShareFetchWatchKeys.add(new 
DelayedShareFetchPartitionKey(topicIdPartition.topicId(), 
topicIdPartition.partition()));
-            // The share partition is initialized asynchronously, so we need 
to wait for it to be initialized.
-            // But if the share partition is already initialized, then the 
future will be completed immediately.
-            // Hence, it's safe to call the maybeInitialize method and then 
wait for the future to be completed.
-            // TopicPartitionData list will be populated only if the share 
partition is already initialized.
-            sharePartition.maybeInitialize().whenComplete((result, throwable) 
-> {
+
+            CompletableFuture<Void> initializationFuture = 
sharePartition.maybeInitialize();
+            final boolean initialized = initializationFuture.isDone();
+            initializationFuture.whenComplete((result, throwable) -> {
                 if (throwable != null) {
-                    // TODO: Complete error handling for initialization. We 
have to record the error
-                    //  for respective share partition as completing the full 
request might result in
-                    //  some acquired records to not being sent: 
https://issues.apache.org/jira/browse/KAFKA-17510
-                    
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchData.future(), throwable);
+                    handleInitializationException(sharePartitionKey, 
shareFetch, throwable);

Review Comment:
   Since we are triggering delayedShareFetch below, do we need to handle the 
error for shareFetch here?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -198,7 +198,7 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
 
         sharePartitions.forEach((topicIdPartition, sharePartition) -> {
-            int partitionMaxBytes = 
shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
+            int partitionMaxBytes = 
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);

Review Comment:
   Should we skip erroneous partitions in `shareFetch`? Also, when calling 
`sharePartition.maybeAcquireFetchLock()`, if the partition is in ERROR or 
FENCED state, should we add the partition to erroneous partitions in 
`shareFetch` too?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -507,19 +506,18 @@ public void acknowledgeSessionUpdate(String groupId, 
ShareRequestMetadata reqMet
      * but as we cannot determine which share partition errored out, we might 
remove all the share partitions
      * in the request.
      *
-     * @param groupId The group id in the share fetch request.
+     * @param shareFetch The share fetch request.
      * @param topicIdPartitions The topic-partitions in the replica read 
request.
-     * @param future The future to complete with the exception.
      * @param throwable The exception that occurred while fetching messages.
      */
     public void handleFetchException(
-        String groupId,
+        ShareFetch shareFetch,

Review Comment:
   Could we move this method to `DelayedShareFetch` and make it private since 
it's only called there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to