junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1270993960


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -119,6 +119,8 @@ public class RemoteLogManager implements Closeable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
     private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
 
+    private static final Set<Uuid> deletedTopicIds = 
ConcurrentHashMap.newKeySet();

Review Comment:
   It's weird to have this field as a static member. Could we make it an 
instance member?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {
+                cleanupAllRemoteLogSegments();
+                cancelRLMtask();
+                deletedTopicIds.remove(topicId);
+            }
+        }
+
+        private void cleanupAllRemoteLogSegments() {

Review Comment:
   We could, but it adds its own overhead.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {
+                cleanupAllRemoteLogSegments();
+                cancelRLMtask();
+                deletedTopicIds.remove(topicId);
+            }
+        }
+
+        private void cleanupAllRemoteLogSegments() {
+            if (!isLeader())

Review Comment:
   This would still be a bit weird. It's possible for a partition to be already 
leaderless (for example, no replica in ISR) when a topic is deleted. How is 
that case handled?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop                A map from a topic partition to a 
boolean indicating
+   *                                        whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *                                        remote segments.
    *
-   * @return                    A map from partitions to exceptions which 
occurred.
-   *                            If no errors occurred, the map will be empty.
+   * @return                                A map from partitions to 
exceptions which occurred.
+   *                                        If no errors occurred, the map 
will be empty.
    */
   protected def stopPartitions(
-    partitionsToStop: Map[TopicPartition, Boolean]
+    partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   Could you file a jira to track that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to