rohitshekhar29 commented on a change in pull request #8936: URL: https://github.com/apache/kafka/pull/8936#discussion_r451260203
########## File path: core/src/test/scala/unit/kafka/log/TimeIndexTest.scala ########## @@ -148,5 +148,39 @@ class TimeIndexTest { idx.close() } -} + /** + * In certain cases, index files fail to have their pre-allocated 0 bytes trimmed from the tail + * when a new segment is rolled. This causes a silent failure at the next startup where all retention + * windows are breached purging out data whether or not the window was really breached. + * KAFKA-10207 + */ + @Test + def testLoadingUntrimmedIndex(): Unit = { + // A larger index size must be specified or the starting offset will round down + // preventing this issue from being reproduced. Configs default to 10mb. + val max1MbEntryCount = 100000 + // Create a file that will exist on disk and be removed when we are done + val file = nonExistantTempFile() + file.deleteOnExit() + // create an index that can have up to 100000 entries, about 1mb + var idx2 = new TimeIndex(file, baseOffset = 0, max1MbEntryCount * 12) + // Append less than the maximum number of entries, leaving 0 bytes padding the end + for (i <- 1 until max1MbEntryCount) + idx2.maybeAppend(i, i) + + idx2.flush() + // jvm 1.8.0_191 fails to always flush shrinking resize to zfs disk + // jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws + //Explicitly call close handler to unmap the file and avoid the index from being trimmed when saved + idx2.closeHandler() + // The next read of the index data from disk will have 12 0 bytes at the tail, when reading + // the buffer position starts at the end and the index is assumed to be full because it was + // supposed to be trimmed before the last save. + idx2 = new TimeIndex(file, baseOffset = 0, maxIndexSize = max1MbEntryCount * 12) + // This index should fail the sanity check as the last timestamp in the file is 0 which is + // less than the first timestamp in the file. + intercept[CorruptIndexException](idx2.sanityCheck()) + idx2.close() + } +} Review comment: @Johnny-Malizia does it make sense to strengthen the existing sanity check, for example for offsetIndex, the sanity check can also be extended to have non zero and incremented file position for more than one entries. ` override def sanityCheck(): Unit = { | if (_entries != 0 && _lastOffset < baseOffset) ` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org