apoorvmittal10 commented on code in PR #17709: URL: https://github.com/apache/kafka/pull/17709#discussion_r1834135963
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -565,74 +562,70 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti } // Visible for testing. - void processShareFetch(ShareFetchData shareFetchData) { - if (shareFetchData.partitionMaxBytes().isEmpty()) { + void processShareFetch(ShareFetch shareFetch) { + if (shareFetch.partitionMaxBytes().isEmpty()) { // If there are no partitions to fetch then complete the future with an empty map. - shareFetchData.future().complete(Collections.emptyMap()); + shareFetch.maybeComplete(Collections.emptyMap()); return; } - // Initialize lazily, if required. - Map<TopicIdPartition, Throwable> erroneous = null; Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>(); LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); - for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) { + for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) { SharePartitionKey sharePartitionKey = sharePartitionKey( - shareFetchData.groupId(), + shareFetch.groupId(), topicIdPartition ); SharePartition sharePartition; try { sharePartition = getOrCreateSharePartition(sharePartitionKey); } catch (Exception e) { - // Complete the whole fetch request with an exception if there is an error processing. - // The exception currently can be thrown only if there is an error while initializing - // the share partition. But skip the processing for other share partitions in the request - // as this situation is not expected. - log.error("Error processing share fetch request", e); - if (erroneous == null) { - erroneous = new HashMap<>(); - } - erroneous.put(topicIdPartition, e); + log.debug("Error processing share fetch request", e); + shareFetch.addErroneous(topicIdPartition, e); // Continue iteration for other partitions in the request. continue; } // We add a key corresponding to each share partition in the request in the group so that when there are // acknowledgements/acquisition lock timeout etc., we have a way to perform checkAndComplete for all // such requests which are delayed because of lack of data to acquire for the share partition. - delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(shareFetch.groupId(), + topicIdPartition.topicId(), topicIdPartition.partition()); + delayedShareFetchWatchKeys.add(delayedShareFetchKey); // We add a key corresponding to each topic partition in the request so that when the HWM is updated // for any topic partition, we have a way to perform checkAndComplete for all such requests which are // delayed because of lack of data to acquire for the topic partition. delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); - // The share partition is initialized asynchronously, so we need to wait for it to be initialized. - // But if the share partition is already initialized, then the future will be completed immediately. - // Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed. - // TopicPartitionData list will be populated only if the share partition is already initialized. - sharePartition.maybeInitialize().whenComplete((result, throwable) -> { + + CompletableFuture<Void> initializationFuture = sharePartition.maybeInitialize(); + final boolean initialized = initializationFuture.isDone(); + initializationFuture.whenComplete((result, throwable) -> { if (throwable != null) { - // TODO: Complete error handling for initialization. We have to record the error - // for respective share partition as completing the full request might result in - // some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510 - maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); + handleInitializationException(sharePartitionKey, shareFetch, throwable); Review Comment: I think we should do that. If the code reaches here that means SharePartition is not yet initialized or in some error state, which means no fetch lock will be acquired in delay share fetch on respective SharePartition hence no further handling in DelayedShareFetch. However this code will handle that error appropriately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org