adixitconfluent commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2043830088
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -152,58 +176,68 @@ public void onExpiration() { @Override public void onComplete() { // We are utilizing lock so that onComplete doesn't do a dirty read for instance variables - - // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + // partitionsAcquired and localPartitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), partitionsAcquired.keySet()); try { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData; - // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) { - topicPartitionData = acquirablePartitions(); - // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks - // for the share partition, hence if no partitions are yet acquired by tryComplete, - // we record the metric here. Do not check if the request has successfully acquired any - // partitions now or not, as then the upper bound of request timeout shall be recorded - // for the metric. - updateAcquireElapsedTimeMetric(); + if (remoteStorageFetchException.isPresent()) { + completeErroneousRemoteShareFetchRequest(); + } else if (remoteFetchOpt.isPresent()) { + completeRemoteStorageShareFetchRequest(); } else { - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - topicPartitionData = partitionsAcquired; + completeLocalLogShareFetchRequest(); } - - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); - shareFetch.maybeComplete(Map.of()); - return; - } else { - // Update metric to record acquired to requested partitions. - double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); - } - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); - - completeShareFetchRequest(topicPartitionData); } finally { lock.unlock(); } } - private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { + private void completeLocalLogShareFetchRequest() { + LinkedHashMap<TopicIdPartition, Long> topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (partitionsAcquired.isEmpty()) { + topicPartitionData = acquirablePartitions(sharePartitions); + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + topicPartitionData = partitionsAcquired; + } + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); + return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + processAcquiredTopicPartitions(topicPartitionData); + } + + private void processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { Review Comment: I have renamed the function to `processAcquiredTopicPartitionsForLocalLogFetch` ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -242,8 +268,14 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top */ @Override public boolean tryComplete() { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions(); + // Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first. + // This will help to prevent starving remote storage partitions and wasting the significant upfront work involved with + // kicking off a fetch from remote storage. Review Comment: The point I was trying to convey was that remote fetch would take more resources that a local log fetch. So, if we've identified that a given share fetch request contains remote storage topic partitions fetch in-flight, we should prioritize them over local log fetch. I guess the word "starve" makes it a little unclear. I've changed the comment to `Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org