adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043841030


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();

Review Comment:
   hmm, `tryComplete()` should return true/false depending on whether we can 
move to `forceComplete()` or not. In case we decide not to call 
`maybeCompletePendingRemoteFetch()`, we will have to change this line to 
`return false`. I don't think that's an ideal behaviour. Your thoughts @junrao ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to