junrao commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2052697705
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -252,13 +291,19 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); + // Store the remote fetch info and the topic partition for which we need to perform remote fetch. + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); + + if (topicPartitionRemoteFetchInfoOpt.isPresent()) { + return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get()); + } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { partitionsAcquired = topicPartitionData; - partitionsAlreadyFetched = replicaManagerReadResponse; + localPartitionsAlreadyFetched = replicaManagerReadResponse; boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed - // hence release the acquired locks. + // hence the acquired locks are already released. Review Comment: This comment is still not quite accurate. If `forceComplete()` returns false, it actually means that the locks haven't been released yet. How about the following? `If the delayed operation is completed by me, the acquired locks are already released in onComplete(). Otherwise, need to release the acquired locks. ` Also, we call `forceComplete()` in 4 different places and there are quite a bit of duplicated code/comment. Could we introduce a private method and reuse the code in all those places? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -277,22 +322,39 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); - return forceComplete(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + return forceComplete(); + } else { + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence the acquired locks are already released. This can occur in case of remote storage fetch if there is a thread that + // completes the operation (due to expiration) right before a different thread is about to enter tryComplete. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); Review Comment: When we release the locks here, should we trigger a purgatory check since some other pending delayed operations may be waiting on the locks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org