adixitconfluent commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2046211183
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +582,300 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) throws Exception { + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(Set.of(topicIdPartition)); + } + }); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); Review Comment: We are not throwing exception to the caller. We are force completing the request with the error. When we throw an error in `maybeProcessRemoteFetch`, we handle it in the [catch](https://github.com/apache/kafka/pull/19437/files#diff-d835cdc01e77905316584ce9e6e21a060cb3d36efa717d4b822b16744e4d713aR324) in `tryComplete`. This catch calls `forceComplete`. Once we enter `onComplete`, it goes to this [line](https://github.com/apache/kafka/pull/19437/files#diff-d835cdc01e77905316584ce9e6e21a060cb3d36efa717d4b822b16744e4d713aR197) and calls `completeErroneousRemoteShareFetchRequest`. The function `completeErroneousRemoteShareFetchRequest` completes the share fetch request with exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org