gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+    val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+    abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+      val tp = futureLog.topicPartition
+      if (cleaner != null) {

Review Comment:
   > I guess cleaner.abortAndPauseCleaning is added because we call 
resumeCleaning later, and it will cause error if we don't call 
abortAndPauseCleaning here?
   
   That's correct if the cleaning operation hasn't started. The cleaning 
operation is scheduled on a separate thread so we cannot be sure if 
`inProgress` map in `LogCleanerManager` has a key for the given topicPartition 
at the time we iterate over these logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to