adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2046239261


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));

Review Comment:
   We are handling the errors in the same way as non errors i.e. via instance 
variables `remoteStorageFetchException` and `remoteFetchOpt`. The flow been 
explained in the above 
[comment](https://github.com/apache/kafka/pull/19437#discussion_r2046211183) 
for the error scenario. For the non-error scenario, we populate 
`remoteFetchOpt` and once the request can be completed, we go to `onComplete`, 
check the value of `remoteFetchOpt` and complete the remote share fetch request.
   For both the cases the share fetch request completes from `onComplete`using 
instance variable values.
   ```
   if (remoteStorageFetchException.isPresent()) {
             completeErroneousRemoteShareFetchRequest();
   } else if (remoteFetchOpt.isPresent()) {
              completeRemoteStorageShareFetchRequest();
   } else {
              completeLocalLogShareFetchRequest();
               }
   ```



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));

Review Comment:
   We are handling the errors in the same way as non errors i.e. via instance 
variables `remoteStorageFetchException` and `remoteFetchOpt`. The flow been 
explained in the above 
[comment](https://github.com/apache/kafka/pull/19437#discussion_r2046211183) 
for the error scenario. For the non-error scenario, we populate 
`remoteFetchOpt` and once the request can be completed, we go to `onComplete`, 
check the value of `remoteFetchOpt` and complete the remote share fetch request.
   For both the cases the share fetch request completes from `onComplete`using 
instance variable values.
   ```
   if (remoteStorageFetchException.isPresent()) {
             completeErroneousRemoteShareFetchRequest();
   } else if (remoteFetchOpt.isPresent()) {
              completeRemoteStorageShareFetchRequest();
   } else {
              completeLocalLogShareFetchRequest();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to