adixitconfluent commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2046239261
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +582,300 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) throws Exception { + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(Set.of(topicIdPartition)); + } + }); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo(); + LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + return Optional.of(e); + } catch (Exception e) { + return Optional.of(e); + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); Review Comment: We are handling the errors in the same way as non errors i.e. via instance variables `remoteStorageFetchException` and `remoteFetchOpt`. The flow been explained in the above [comment](https://github.com/apache/kafka/pull/19437#discussion_r2046211183) for the error scenario. For the non-error scenario, we populate `remoteFetchOpt` and once the request can be completed, we go to `onComplete`, check the value of `remoteFetchOpt` and complete the remote share fetch request. For both the cases the share fetch request completes from `onComplete`using instance variable values. ``` if (remoteStorageFetchException.isPresent()) { completeErroneousRemoteShareFetchRequest(); } else if (remoteFetchOpt.isPresent()) { completeRemoteStorageShareFetchRequest(); } else { completeLocalLogShareFetchRequest(); } ``` ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +582,300 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) throws Exception { + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(Set.of(topicIdPartition)); + } + }); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo(); + LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + return Optional.of(e); + } catch (Exception e) { + return Optional.of(e); + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); Review Comment: We are handling the errors in the same way as non errors i.e. via instance variables `remoteStorageFetchException` and `remoteFetchOpt`. The flow been explained in the above [comment](https://github.com/apache/kafka/pull/19437#discussion_r2046211183) for the error scenario. For the non-error scenario, we populate `remoteFetchOpt` and once the request can be completed, we go to `onComplete`, check the value of `remoteFetchOpt` and complete the remote share fetch request. For both the cases the share fetch request completes from `onComplete`using instance variable values. ``` if (remoteStorageFetchException.isPresent()) { completeErroneousRemoteShareFetchRequest(); } else if (remoteFetchOpt.isPresent()) { completeRemoteStorageShareFetchRequest(); } else { completeLocalLogShareFetchRequest(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org