junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2052697705


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -252,13 +291,19 @@ public boolean tryComplete() {
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
                 LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                // Store the remote fetch info and the topic partition for 
which we need to perform remote fetch.
+                Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = 
maybePrepareRemoteStorageFetchInfo(topicPartitionData, 
replicaManagerReadResponse);
+
+                if (topicPartitionRemoteFetchInfoOpt.isPresent()) {
+                    return maybeProcessRemoteFetch(topicPartitionData, 
topicPartitionRemoteFetchInfoOpt.get());
+                }
                 maybeUpdateFetchOffsetMetadata(topicPartitionData, 
replicaManagerReadResponse);
                 if (anyPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData, 
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, 
topicPartitionData.keySet(), topicPartitionData.size()))) {
                     partitionsAcquired = topicPartitionData;
-                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    localPartitionsAlreadyFetched = replicaManagerReadResponse;
                     boolean completedByMe = forceComplete();
                     // If invocation of forceComplete is not successful, then 
that means the request is already completed
-                    // hence release the acquired locks.
+                    // hence the acquired locks are already released.

Review Comment:
   This comment is still not quite accurate. If `forceComplete()` returns 
false, it actually means that the locks haven't been released yet.  How about 
the following?
   
   `If the delayed operation is completed by me, the acquired locks are already 
released in onComplete(). Otherwise, need to release the acquired locks. `
   
   Also, we call `forceComplete()` in 4 different places and there are quite a 
bit of duplicated code/comment. Could we introduce a private method and reuse 
the code in all those places? 



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +322,39 @@ public boolean tryComplete() {
             return false;
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            releasePartitionLocks(topicPartitionData.keySet());
-            partitionsAcquired.clear();
-            partitionsAlreadyFetched.clear();
-            return forceComplete();
+            // In case we have a remote fetch exception, we have already 
released locks for partitions which have potential
+            // local log read. We do not release locks for partitions which 
have a remote storage read because we need to
+            // complete the share fetch request in onComplete and if we 
release the locks early here, some other DelayedShareFetch
+            // request might get the locks for those partitions without this 
one getting complete.
+            if (remoteStorageFetchException.isEmpty()) {
+                releasePartitionLocks(topicPartitionData.keySet());
+                partitionsAcquired.clear();
+                localPartitionsAlreadyFetched.clear();
+                return forceComplete();
+            } else {
+                boolean completedByMe = forceComplete();
+                // If invocation of forceComplete is not successful, then that 
means the request is already completed
+                // hence the acquired locks are already released. This can 
occur in case of remote storage fetch if there is a thread that
+                // completes the operation (due to expiration) right before a 
different thread is about to enter tryComplete.
+                if (!completedByMe) {
+                    releasePartitionLocks(partitionsAcquired.keySet());

Review Comment:
   When we release the locks here, should we trigger a purgatory check since 
some other pending delayed operations may be waiting on the locks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to