junrao commented on code in PR #19437: URL: https://github.com/apache/kafka/pull/19437#discussion_r2049738720
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +592,308 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return topicPartitionRemoteFetchInfoOpt; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) throws Exception { + Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + nonRemoteFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add + // them to the delayed actions queue. + releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); + if (exceptionOpt.isPresent()) { + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + remoteStorageFetchException = Optional.of(e); + return Optional.of(e); Review Comment: Could we just throw the exception and remove the return value? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -529,8 +592,308 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return topicPartitionRemoteFetchInfoOpt; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) throws Exception { + Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + nonRemoteFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add + // them to the delayed actions queue. + releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); + Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); + if (exceptionOpt.isPresent()) { + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + */ + private Optional<Exception> processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + remoteStorageFetchException = Optional.of(e); + return Optional.of(e); + } catch (Exception e) { + remoteStorageFetchException = Optional.of(e); Review Comment: Should we just keep this and remove `catch (RejectedExecutionException e)`? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -277,22 +323,39 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); - return forceComplete(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + return forceComplete(); + } else { + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that Review Comment: > then that means the request is already completed hence release the acquired locks. then that means the request is already completed and hence the acquired locks are already released. Ditto above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org