adixitconfluent commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2043841030
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +575,315 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchMetadataMap = new LinkedHashMap<>(); + replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> { + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + localPartitionsAlreadyFetched.put(topicIdPartition, logReadResult); + } + }); + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) throws Exception { + // topic partitions for which fetching would be happening from local log and not remote storage. + Set<TopicIdPartition> localFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) { + localFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(localFetchTopicPartitions); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); Review Comment: hmm, `tryComplete()` should return true/false depending on whether we can move to `forceComplete()` or not. In case we decide not to call `maybeCompletePendingRemoteFetch()`, we will have to change this line to `return false`. I don't think that's an ideal behaviour. Your thoughts @junrao ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org