junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2049738720


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +592,308 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                topicPartitionRemoteFetchInfoOpt = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return topicPartitionRemoteFetchInfoOpt;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) throws Exception {
+        Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                nonRemoteFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release fetch lock for the topic partitions that were acquired but 
were not a part of remote fetch and add
+        // them to the delayed actions queue.
+        
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+        if (exceptionOpt.isPresent()) {
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            remoteStorageFetchException = Optional.of(e);
+            return Optional.of(e);

Review Comment:
   Could we just throw the exception and remove the return value?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +592,308 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                topicPartitionRemoteFetchInfoOpt = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return topicPartitionRemoteFetchInfoOpt;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) throws Exception {
+        Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                nonRemoteFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release fetch lock for the topic partitions that were acquired but 
were not a part of remote fetch and add
+        // them to the delayed actions queue.
+        
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+        if (exceptionOpt.isPresent()) {
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            remoteStorageFetchException = Optional.of(e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            remoteStorageFetchException = Optional.of(e);

Review Comment:
   Should we just keep this and remove `catch (RejectedExecutionException e)`?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +323,39 @@ public boolean tryComplete() {
             return false;
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            releasePartitionLocks(topicPartitionData.keySet());
-            partitionsAcquired.clear();
-            partitionsAlreadyFetched.clear();
-            return forceComplete();
+            // In case we have a remote fetch exception, we have already 
released locks for partitions which have potential
+            // local log read. We do not release locks for partitions which 
have a remote storage read because we need to
+            // complete the share fetch request in onComplete and if we 
release the locks early here, some other DelayedShareFetch
+            // request might get the locks for those partitions without this 
one getting complete.
+            if (remoteStorageFetchException.isEmpty()) {
+                releasePartitionLocks(topicPartitionData.keySet());
+                partitionsAcquired.clear();
+                localPartitionsAlreadyFetched.clear();
+                return forceComplete();
+            } else {
+                boolean completedByMe = forceComplete();
+                // If invocation of forceComplete is not successful, then that 
means the request is already completed
+                // hence release the acquired locks. This can occur in case of 
remote storage fetch if there is a thread that

Review Comment:
   > then that means the request is already completed hence release the 
acquired locks. 
   
   then that means the request is already completed and hence the acquired 
locks are already released. Ditto above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to