junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig, // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. stoppedPartitions += topicPartition -> deletePartition + if (remoteLogManager.isDefined) + partitionsMaybeToDeleteRemote += topicPartition Review Comment: Hmm, it seems this case can occur during partition reassignment. In that case, we don't want to delete the remote data, right? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { Review Comment: This process runs every replica. So, we will be deleting the same remote segment multiple times? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStop A map from a topic partition to a boolean indicating - * whether the partition should be deleted. + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + * remote segments. * - * @return A map from partitions to exceptions which occurred. - * If no errors occurred, the map will be empty. + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. */ protected def stopPartitions( - partitionsToStop: Map[TopicPartition, Boolean] + partitionsToStop: Map[TopicPartition, Boolean], Review Comment: It seems that the implementation doesn't support KRaft controller. Do we plan to support that for the 3.6.0 release? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File], checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } addLogToBeDeleted(removedLog) + if (deleteRemote && removedLog.remoteLogEnabled()) + RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava) Review Comment: LogManager only manages local data. So, it's a bit weird to have it call RemoteLogManager. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { + cleanupAllRemoteLogSegments(); + cancelRLMtask(); + deletedTopicIds.remove(topicId); + } + } + + private void cleanupAllRemoteLogSegments() { Review Comment: Since this runs asynchronously after topic deletion completes, if every replica is restarted before all remote segments are deleted, we will never be able to remove the remaining remote segments for the deleted topics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org