adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043830522


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse

Review Comment:
   sorry, I missed the parameter `replicaManagerReadResponse`. I've added it 
now. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to