gongxuanzhang opened a new pull request, #19419:
URL: https://github.com/apache/kafka/pull/19419

   issue link https://issues.apache.org/jira/browse/KAFKA-15371
   
   ## conclusion
   
   This issue isn’t caused by differences between the `log` file and the 
`checkpoint` file, but rather by the order in which asynchronous events occur.
   
   
   ## reliably reproduce
   In the current version, you can reliably reproduce this issue by adding a 
small sleep in `SnapshotFileReader#handleNextBatch` , like this:
   ```
    private void handleNextBatch() {
           if (!batchIterator.hasNext()) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
               beginShutdown("done");
               return;
           }
           FileChannelRecordBatch batch = batchIterator.next();
           if (batch.isControlBatch()) {
               handleControlBatch(batch);
           } else {
               handleMetadataBatch(batch);
           }
           scheduleHandleNextBatch();
           lastOffset = batch.lastOffset();
       }
   ```
   
   you can download a test file [test checkpoint 
file](https://github.com/user-attachments/files/19659636/00000000000000007169-0000000001.checkpoint.log)
 
   
   ⚠️: Please remove the .log extension after downloading, since GitHub doesn’t 
allow uploading checkpoint files directly.
   
   After change code  and gradle build ,  you can run 
`bin/kafka-metadata-shell.sh --snapshot   ${your file path}`
   
   You will only see a loading message in the console like this:
   <img width="248" alt="image" 
src="https://github.com/user-attachments/assets/fe4b4eba-7a6a-4cee-9b56-c82a5fa02c89";
 />
   
    
   
   ## Cause of the Bug
   After the `SnapshotFileReader startup`, it will enqueue the iterator’s 
events to its own kafkaQueue.
   The impontent method is: `SnapshotFileReader#scheduleHandleNextBatch`
   
   When processing each batch of the iterator, it adds metadata events for the 
batch to the kafkaQueue(different from the SnapshotFileReader.) of the 
metadataLoader.
   The impontent method is `SnapshotFileReader#handleMetadataBatch` and 
`MetadataLoader#handleCommit`
   
   When the MetadataLoader processes a MetadataDelta, it checks whether the 
high watermark has been updated. If not, it skips processing  
   The impontent method is `MetadataLoader#maybePublishMetadata` and 
`maybePublishMetadata#stillNeedToCatchUp`
   
   The crucial high watermark update happens after the SnapshotFileReader’s 
iterator finishes reading, using the cleanup task of its kafkaQueue.
   
   So, if the MetadataLoader finishes processing all batches before the high 
watermark is updated, the main thread will keep waiting.
   <img width="1088" alt="image" 
src="https://github.com/user-attachments/assets/03daa288-ff39-49a3-bbc7-e7b5831a858b";
 />
   
   
   
   
   
   <img width="867" alt="image" 
src="https://github.com/user-attachments/assets/fc0770dd-de54-4f69-b669-ab4e696bd2a7";
 />
   
   
   ## Solution
   If we’ve reached the last batch in the iteration, we update the high 
watermark first before adding events to the MetadataLoader, ensuring that 
MetadataLoader runs at least once after the watermark is updated.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to