apoorvmittal10 commented on code in PR #17709:
URL: https://github.com/apache/kafka/pull/17709#discussion_r1834135963


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -565,74 +562,70 @@ private static String 
partitionsToLogString(Collection<TopicIdPartition> partiti
     }
 
     // Visible for testing.
-    void processShareFetch(ShareFetchData shareFetchData) {
-        if (shareFetchData.partitionMaxBytes().isEmpty()) {
+    void processShareFetch(ShareFetch shareFetch) {
+        if (shareFetch.partitionMaxBytes().isEmpty()) {
             // If there are no partitions to fetch then complete the future 
with an empty map.
-            shareFetchData.future().complete(Collections.emptyMap());
+            shareFetch.maybeComplete(Collections.emptyMap());
             return;
         }
 
-        // Initialize lazily, if required.
-        Map<TopicIdPartition, Throwable> erroneous = null;
         Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
-        for (TopicIdPartition topicIdPartition : 
shareFetchData.partitionMaxBytes().keySet()) {
+        for (TopicIdPartition topicIdPartition : 
shareFetch.partitionMaxBytes().keySet()) {
             SharePartitionKey sharePartitionKey = sharePartitionKey(
-                shareFetchData.groupId(),
+                shareFetch.groupId(),
                 topicIdPartition
             );
 
             SharePartition sharePartition;
             try {
                 sharePartition = getOrCreateSharePartition(sharePartitionKey);
             } catch (Exception e) {
-                // Complete the whole fetch request with an exception if there 
is an error processing.
-                // The exception currently can be thrown only if there is an 
error while initializing
-                // the share partition. But skip the processing for other 
share partitions in the request
-                // as this situation is not expected.
-                log.error("Error processing share fetch request", e);
-                if (erroneous == null) {
-                    erroneous = new HashMap<>();
-                }
-                erroneous.put(topicIdPartition, e);
+                log.debug("Error processing share fetch request", e);
+                shareFetch.addErroneous(topicIdPartition, e);
                 // Continue iteration for other partitions in the request.
                 continue;
             }
 
             // We add a key corresponding to each share partition in the 
request in the group so that when there are
             // acknowledgements/acquisition lock timeout etc., we have a way 
to perform checkAndComplete for all
             // such requests which are delayed because of lack of data to 
acquire for the share partition.
-            delayedShareFetchWatchKeys.add(new 
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), 
topicIdPartition.partition()));
+            DelayedShareFetchKey delayedShareFetchKey = new 
DelayedShareFetchGroupKey(shareFetch.groupId(),
+                topicIdPartition.topicId(), topicIdPartition.partition());
+            delayedShareFetchWatchKeys.add(delayedShareFetchKey);
             // We add a key corresponding to each topic partition in the 
request so that when the HWM is updated
             // for any topic partition, we have a way to perform 
checkAndComplete for all such requests which are
             // delayed because of lack of data to acquire for the topic 
partition.
             delayedShareFetchWatchKeys.add(new 
DelayedShareFetchPartitionKey(topicIdPartition.topicId(), 
topicIdPartition.partition()));
-            // The share partition is initialized asynchronously, so we need 
to wait for it to be initialized.
-            // But if the share partition is already initialized, then the 
future will be completed immediately.
-            // Hence, it's safe to call the maybeInitialize method and then 
wait for the future to be completed.
-            // TopicPartitionData list will be populated only if the share 
partition is already initialized.
-            sharePartition.maybeInitialize().whenComplete((result, throwable) 
-> {
+
+            CompletableFuture<Void> initializationFuture = 
sharePartition.maybeInitialize();
+            final boolean initialized = initializationFuture.isDone();
+            initializationFuture.whenComplete((result, throwable) -> {
                 if (throwable != null) {
-                    // TODO: Complete error handling for initialization. We 
have to record the error
-                    //  for respective share partition as completing the full 
request might result in
-                    //  some acquired records to not being sent: 
https://issues.apache.org/jira/browse/KAFKA-17510
-                    
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchData.future(), throwable);
+                    handleInitializationException(sharePartitionKey, 
shareFetch, throwable);

Review Comment:
   I think we should do that. If the code reaches here that means 
SharePartition is not yet initialized or in some error state, which means no 
fetch lock will be acquired in delay share fetch on respective SharePartition 
hence no further handling in DelayedShareFetch. However this code will handle 
that error appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to