junrao commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2052746487
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -277,22 +316,32 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); - return forceComplete(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + return forceComplete(); Review Comment: Should we just call `forceCompleteRequest()` here to for consistency? We could optimize `releasePartitionLocksAndAddToActionQueue()` so that if doesn't add to the action queue if the acquired partition set is empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org