kowshik commented on a change in pull request #10896: URL: https://github.com/apache/kafka/pull/10896#discussion_r655788102
########## File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala ########## @@ -1535,4 +1534,122 @@ class LogLoaderTest { assertTrue(onlySegment.lazyOffsetIndex.file.exists()) assertTrue(onlySegment.lazyTimeIndex.file.exists()) } + + @Test + def testCorruptedLogRecoveryDoesNotDeleteProducerStateSnapshotsPostRecovery(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + var log = createLog(logDir, logConfig) + // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> logStartOffset |---> active segment (empty) + // |---> logEndOffset + for (i <- 0 until 9) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(10, log.logSegments.size) + assertEquals(0, log.logStartOffset) + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 1 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + // Increment the log start offset to 4. + // After this step, the segments should be: + // |---> logStartOffset + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> active segment (empty) + // |---> logEndOffset + val newLogStartOffset = 4 + log.updateHighWatermark(log.logEndOffset) + log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion) + assertEquals(4, log.logStartOffset) + assertEquals(9, log.logEndOffset) + + // Append garbage to a segment at baseOffset 1, which is below the current log start offset 4. + // After this step, the segments should be: + // + // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | |---> logStartOffset |---> active segment (empty) + // | |---> logEndOffset + // corrupt record inserted + // + val segmentToForceTruncation = log.logSegments.take(2).last + assertEquals(1, segmentToForceTruncation.baseOffset) + val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) + bw.write("corruptRecord") + bw.close() + log.close() + + // Reopen the log. This will do the following: + // - Truncate the segment above to which we appended garbage and will schedule async deletion of all other + // segments from base offsets 2 to 9. + // - The remaining segments at base offsets 0 and 1 will be lower than the current logStartOffset 4. + // This will cause async deletion of both remaining segments. Finally a single, active segment is created + // starting at logStartOffset 4. + // + // Expected segments after the log is opened again: + // [4-] + // |---> active segment (empty) + // |---> logStartOffset + // |---> logEndOffset + log = createLog(logDir, logConfig, logStartOffset = newLogStartOffset, lastShutdownClean = false) + assertEquals(1, log.logSegments.size) + assertEquals(4, log.logStartOffset) + assertEquals(4, log.activeSegment.baseOffset) + assertEquals(4, log.logEndOffset) + + // Append records, roll the segments and check that the producer state snapshots are defined. + // The expected segments and producer state snapshots, after the appends are complete and segments are rolled, + // is as shown below: + // [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // | | | | | |---> active segment (empty) + // | | | | | |---> logEndOffset + // | | | | | | + // | |------.------.------.------.-----> producer state snapshot files are DEFINED for each offset in: [5-9] + // |----------------------------------------> logStartOffset + for (i <- 0 until 5) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + assertEquals(9, log.activeSegment.baseOffset) + assertEquals(9, log.logEndOffset) + for (offset <- 5 until 10) { + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset) + assertTrue(snapshotFileBeforeDeletion.isDefined) + assertTrue(snapshotFileBeforeDeletion.get.file.exists) + } + + val offsetsWithSnapshotFiles = (1 until 5) Review comment: Hmm, this check needs to be split into 2 steps. This is because we don't really delete the files until `mockTime.sleep(logConfig.fileDeleteDelayMs)` is done later. 1. Here we need to check if the entries for offsets `1-4` have been removed from `ProducerStateManager` map and check if the snapshot files have been just renamed to `.deleted` suffix. 2. Below after the call to `mockTime.sleep(logConfig.fileDeleteDelayMs)`, we need to additionally check that the snapshot files for offsets `1-4` have been deleted as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org