junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
               stoppedPartitions += topicPartition -> deletePartition
+              if (remoteLogManager.isDefined)
+                partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, it seems this case can occur during partition reassignment. In that 
case, we don't want to delete the remote data, right?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {

Review Comment:
   This process runs every replica. So, we will be deleting the same remote 
segment multiple times?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop                A map from a topic partition to a 
boolean indicating
+   *                                        whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *                                        remote segments.
    *
-   * @return                    A map from partitions to exceptions which 
occurred.
-   *                            If no errors occurred, the map will be empty.
+   * @return                                A map from partitions to 
exceptions which occurred.
+   *                                        If no errors occurred, the map 
will be empty.
    */
   protected def stopPartitions(
-    partitionsToStop: Map[TopicPartition, Boolean]
+    partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   It seems that the implementation doesn't support KRaft controller. Do we 
plan to support that for the 3.6.0 release?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
           checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
         }
         addLogToBeDeleted(removedLog)
+        if (deleteRemote && removedLog.remoteLogEnabled())
+          RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava)

Review Comment:
   LogManager only manages local data. So, it's a bit weird to have it call 
RemoteLogManager.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {
+                cleanupAllRemoteLogSegments();
+                cancelRLMtask();
+                deletedTopicIds.remove(topicId);
+            }
+        }
+
+        private void cleanupAllRemoteLogSegments() {

Review Comment:
   Since this runs asynchronously after topic deletion completes, if every 
replica is restarted before all remote segments are deleted, we will never be 
able to remove the remaining remote segments for the deleted topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to