Xiaobing Fang created KAFKA-17436: ------------------------------------- Summary: Follower index dump mismatch Key: KAFKA-17436 URL: https://issues.apache.org/jira/browse/KAFKA-17436 Project: Kafka Issue Type: Bug Components: core Reporter: Xiaobing Fang Assignee: Xiaobing Fang Attachments: image-2024-08-28-14-42-44-152.png
When writing data, the starting position and the last offset of the current fetch data will be written to the index. However, when dumping the index, the starting position and the last offset of a batch are verified. Therefore, if the follower fetches multiple batches, it will cause problems with the index file. Reproduction: Unit test in DumpLogSegmentsTest {code:java} def addMultiRecordBatches(): Unit = { var recordsCount = 0L val batches = new ArrayBuffer[BatchInfo] val now = System.currentTimeMillis() val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)} batches += BatchInfo(firstBatchRecords, true, true) val secondBatchRecords = (10 until 30).map { i => new SimpleRecord(now + i * 3, s"message key $i".getBytes, null)} batches += BatchInfo(secondBatchRecords, true, false) val thirdBatchRecords = (30 until 50).map { i => new SimpleRecord(now + i * 5, null, s"message value $i".getBytes)} batches += BatchInfo(thirdBatchRecords, false, true) val fourthBatchRecords = (50 until 60).map { i => new SimpleRecord(now + i * 7, null)} batches += BatchInfo(fourthBatchRecords, false, false) batches.foreach { batchInfo => val buf = ByteBuffer.allocate(2048) batchInfo.records.grouped(5).foreach { records => val builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V1, Compression.NONE, TimestampType.CREATE_TIME, recordsCount) records.foreach(builder.append) builder.close() recordsCount += records.size.toLong } buf.flip() log.appendAsFollower(MemoryRecords.readableRecords(buf.slice())) } // Flush, but don't close so that the indexes are not trimmed and contain some zero entries log.flush(false) } @Test def testDumpIndexMismatches2(): Unit = { log = createTestLog addMultiRecordBatches() val offsetMismatches = mutable.Map[String, List[(Long, Long)]]() DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, verifyOnly = true, offsetMismatches, Int.MaxValue) assertEquals(Map.empty, offsetMismatches) } {code} System test: # start server and produce records # stop follower and delete all data in follower # start follower for fetching data result: * follower index file size is smaller than leader * dump follower index throw index mismatch errors !image-2024-08-28-14-42-44-152.png! Solution: 1: Repair class DumpLogSegments so that it will not report errors 2: After follower fetched multiple batches, write multiple indexes instead of one What kind of solution is recommended, I can submit the repair code -- This message was sent by Atlassian Jira (v8.20.10#820010)