gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { + val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) + abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: > I guess cleaner.abortAndPauseCleaning is added because we call resumeCleaning later, and it will cause error if we don't call abortAndPauseCleaning here? That's correct if the cleaning operation hasn't started. The cleaning operation is scheduled on a separate thread so we cannot be sure if `inProgress` map in `LogCleanerManager` has a key for the given topicPartition at the time we iterate over these logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org