junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1270993960
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -119,6 +119,8 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; + private static final Set<Uuid> deletedTopicIds = ConcurrentHashMap.newKeySet(); Review Comment: It's weird to have this field as a static member. Could we make it an instance member? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { + cleanupAllRemoteLogSegments(); + cancelRLMtask(); + deletedTopicIds.remove(topicId); + } + } + + private void cleanupAllRemoteLogSegments() { Review Comment: We could, but it adds its own overhead. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { + cleanupAllRemoteLogSegments(); + cancelRLMtask(); + deletedTopicIds.remove(topicId); + } + } + + private void cleanupAllRemoteLogSegments() { + if (!isLeader()) Review Comment: This would still be a bit weird. It's possible for a partition to be already leaderless (for example, no replica in ISR) when a topic is deleted. How is that case handled? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStop A map from a topic partition to a boolean indicating - * whether the partition should be deleted. + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + * remote segments. * - * @return A map from partitions to exceptions which occurred. - * If no errors occurred, the map will be empty. + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. */ protected def stopPartitions( - partitionsToStop: Map[TopicPartition, Boolean] + partitionsToStop: Map[TopicPartition, Boolean], Review Comment: Could you file a jira to track that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org