adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2044022042


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {

Review Comment:
   I based this check on this line in 
[DelayedRemoteFetch.scala](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L68).
 I am not sure why this check exists there (were they being overly cautious 
despite having exception handling?), anyways I think for our use case, we don't 
need it. Hence, I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to