kowshik commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r611450200
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2586,11 +1999,15 @@ object Log { logDirFailureChannel: LogDirFailureChannel, lastShutdownClean: Boolean = true, topicId: Option[Uuid], - keepPartitionMetadataFile: Boolean): Log = { + keepPartitionMetadataFile: Boolean = true): Log = { + // create the log directory if it doesn't exist + Files.createDirectories(dir.toPath) val topicPartition = Log.parseTopicPartitionName(dir) - val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) - new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, - producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean, topicId, keepPartitionMetadataFile) + val logLoader = new LogLoader(dir, topicPartition, config, scheduler, time, logDirFailureChannel) Review comment: I tried this, but the problems I ran into were: 1. A test tries to intercept the recovery logic by overriding `def recoverLog` in `LogLoader`. Ex: `LogLoaderTest.testLogRecoveryIsCalledUponBrokerCrash`. 2. Some tests like to inject a custom `ProducerStateManager`, ex: `LogLoaderTest.testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown`. 3. Some tests like to inject a custom `LogSegments`, ex: `LogLoaderTest. testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat`. I couldn't simplify (1) because the test logic seems to be very specific and dependent on the details of the interception. When `LogLoader` is a class, such interception is possible easily. Otherwise if we make it an object, it seems trickier to intercept. If we can simplify (1) to avoid the interception, then (2) and (3) can be simplified because we can always pass the objects into `load()` as parameters. But I couldn't find a better way for (1) so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org