divijvaidya commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1252976430
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { Review Comment: as an optimization, we also want to remove index entries for this partition in the RemoteIndexCache ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -615,6 +661,9 @@ public void run() { try { Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); + // CleanUp/delete deleted remote log segments + cleanupDeletedRemoteLogSegments(); Review Comment: we do not want to block the existing archival to clean up the deleted segments. Clean up for deletion should be done asynchronously with RemotePartitionRemover as specific by KIP-405. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ - public void stopPartitions(TopicPartition topicPartition, boolean delete) { + public void stopPartitions(Set<TopicPartition> topicPartitions, + boolean delete, + BiConsumer<TopicPartition, Throwable> errorHandler) { + LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete); + Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream() + .filter(topicIdByPartitionMap::containsKey) + .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp)) + .collect(Collectors.toSet()); + + topicIdPartitions.forEach(tpId -> { + try { + RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId); + if (task != null) { + LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); + task.cancel(); + } + if (delete) { + LOGGER.info("Deleting the remote log segments task for partition: {}", tpId); + deleteRemoteLogPartition(tpId); Review Comment: Downsides of current implementation: `replicaManager.stopPartitions` has a dependency on RSM availability. If the remote storage is unavailable, stopPartitions will block. This downside can be removed instead of deleting the segments synchronously in the stopPartitions path, we leave it upto the RLMM plugin to perform the cleanup when RLMM.onStopPartitions() is called. The proposed alternative has the following benefits: 1. delete could be implemented async by the RLMM plugin since RLMM plugin can choose to delete data from remote storage asynchronously when stopPartitions is called. 2. Availability of remote storage does not impact the critical path i.e. if remote store is intermittently unavailable, stopPartitions can still go through. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ - public void stopPartitions(TopicPartition topicPartition, boolean delete) { + public void stopPartitions(Set<TopicPartition> topicPartitions, Review Comment: please consider invalidating the segments belonging to this partition from RemoteIndexCache when we are stopping partitions. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -471,6 +472,9 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.Online(partition) => val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch + + if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete && remoteLogManager.isDefined) Review Comment: Everywhere that you are using remoteLogManager.isDefined, please add a check for `log.remoteLogEnabled()`. This is because while remote storage feature may be ON in the cluster and hence, remoteLogManager is present but it may not be turned ON for that particular topic/partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org