adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043830088


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -152,58 +176,68 @@ public void onExpiration() {
     @Override
     public void onComplete() {
         // We are utilizing lock so that onComplete doesn't do a dirty read 
for instance variables -
-        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        // partitionsAcquired and localPartitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
         lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
             + "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
             partitionsAcquired.keySet());
 
         try {
-            LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
-            // tryComplete did not invoke forceComplete, so we need to check 
if we have any partitions to fetch.
-            if (partitionsAcquired.isEmpty()) {
-                topicPartitionData = acquirablePartitions();
-                // The TopicPartitionsAcquireTimeMs metric signifies the 
tension when acquiring the locks
-                // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
-                // we record the metric here. Do not check if the request has 
successfully acquired any
-                // partitions now or not, as then the upper bound of request 
timeout shall be recorded
-                // for the metric.
-                updateAcquireElapsedTimeMetric();
+            if (remoteStorageFetchException.isPresent()) {
+                completeErroneousRemoteShareFetchRequest();
+            } else if (remoteFetchOpt.isPresent()) {
+                completeRemoteStorageShareFetchRequest();
             } else {
-                // tryComplete invoked forceComplete, so we can use the data 
from tryComplete.
-                topicPartitionData = partitionsAcquired;
+                completeLocalLogShareFetchRequest();
             }
-
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
-                shareFetch.maybeComplete(Map.of());
-                return;
-            } else {
-                // Update metric to record acquired to requested partitions.
-                double requestTopicToAcquired = (double) 
topicPartitionData.size() / shareFetch.topicIdPartitions().size();
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
-            }
-            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
-                topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
-
-            completeShareFetchRequest(topicPartitionData);
         } finally {
             lock.unlock();
         }
     }
 
-    private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, 
Long> topicPartitionData) {
+    private void completeLocalLogShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
+        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
+        if (partitionsAcquired.isEmpty()) {
+            topicPartitionData = acquirablePartitions(sharePartitions);
+            // The TopicPartitionsAcquireTimeMs metric signifies the tension 
when acquiring the locks
+            // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
+            // we record the metric here. Do not check if the request has 
successfully acquired any
+            // partitions now or not, as then the upper bound of request 
timeout shall be recorded
+            // for the metric.
+            updateAcquireElapsedTimeMetric();
+        } else {
+            // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
+            topicPartitionData = partitionsAcquired;
+        }
+
+        if (topicPartitionData.isEmpty()) {
+            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
+            shareFetch.maybeComplete(Map.of());
+            return;
+        } else {
+            // Update metric to record acquired to requested partitions.
+            double requestTopicToAcquired = (double) topicPartitionData.size() 
/ shareFetch.topicIdPartitions().size();
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
+        }
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
+            topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
+
+        processAcquiredTopicPartitions(topicPartitionData);
+    }
+
+    private void 
processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long> 
topicPartitionData) {

Review Comment:
   I have renamed the function to 
`processAcquiredTopicPartitionsForLocalLogFetch`



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -242,8 +268,14 @@ private void 
completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top
      */
     @Override
     public boolean tryComplete() {
-        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = 
acquirablePartitions();
+        // Check to see if the remote fetch is in flight. If there is an in 
flight remote fetch we want to resolve it first.
+        // This will help to prevent starving remote storage partitions and 
wasting the significant upfront work involved with
+        // kicking off a fetch from remote storage.

Review Comment:
   The point I was trying to convey was that remote fetch would take more 
resources that a local log fetch. So, if we've identified that a given share 
fetch request contains remote storage topic partitions fetch in-flight, we 
should prioritize them over local log fetch. I guess the word "starve" makes it 
a little unclear. I've changed the comment to `Check to see if the remote fetch 
is in flight. If there is an in flight remote fetch we want to resolve it 
first.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to