junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2052746487


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +316,32 @@ public boolean tryComplete() {
             return false;
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            releasePartitionLocks(topicPartitionData.keySet());
-            partitionsAcquired.clear();
-            partitionsAlreadyFetched.clear();
-            return forceComplete();
+            // In case we have a remote fetch exception, we have already 
released locks for partitions which have potential
+            // local log read. We do not release locks for partitions which 
have a remote storage read because we need to
+            // complete the share fetch request in onComplete and if we 
release the locks early here, some other DelayedShareFetch
+            // request might get the locks for those partitions without this 
one getting complete.
+            if (remoteStorageFetchException.isEmpty()) {
+                releasePartitionLocks(topicPartitionData.keySet());
+                partitionsAcquired.clear();
+                localPartitionsAlreadyFetched.clear();
+                return forceComplete();

Review Comment:
   Should we just call `forceCompleteRequest()` here to for consistency? We 
could optimize `releasePartitionLocksAndAddToActionQueue()` so that if doesn't 
add to the action queue if the acquired partition set is empty. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to