Oscar Westra van Holthe - Kind created AVRO-4006: ----------------------------------------------------
Summary: [Java] DataFileReader does not correctly identify last sync marker when reading/skipping blocks Key: AVRO-4006 URL: https://issues.apache.org/jira/browse/AVRO-4006 Project: Apache Avro Issue Type: Bug Components: java Affects Versions: 1.11.3 Reporter: Oscar Westra van Holthe - Kind The following code demonstrates the problem: {code:java} import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.SeekableFileInput; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import java.io.File; import java.io.IOException; public class AvroTest { public static void main(String[] args) throws IOException { File avroFile = new File("test.avro"); GenericData model = GenericData.get(); Schema simple = SchemaBuilder.record("TestRecord").fields().requiredString("text").endRecord(); Schema.Field textField = simple.getField("text"); try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>(null, model)).create(simple, avroFile)) { for (int i = 1; i <= 1000; i++) { Object record = model.newRecord(null, simple); model.setField(record, textField.name(), textField.pos(), "i = " + i); writer.append(record); if (i % 100 == 0) { long syncPos = writer.sync(); System.out.printf("Synced %d records; file position %d%n", i, syncPos); } } } IndexedRecord result; DatumReader<IndexedRecord> datumReader = new GenericDatumReader<>(simple, simple, model); try (SeekableFileInput sfi = new SeekableFileInput(avroFile); MyDataFileReader<IndexedRecord> reader = new MyDataFileReader<>(sfi, datumReader)) { // Find the start of the last block reading the entire file, WITHOUT decoding any records. // Note that this does decompress the data, but that's so fast these days that it hardly affects reading speed. long lastSyncPos = reader.previousSync(); while (reader.hasNext()) { lastSyncPos = reader.previousSync(); System.out.printf("Sync marker at %d%n", lastSyncPos); // Mark the block as read, so hasNext() will read the next block reader.nextBlock(); } System.out.printf("Sync marker at %d%n", reader.previousSync()); reader.seek(lastSyncPos); IndexedRecord lastRecord1 = null; int decoded = 0; while (reader.hasNext()) { lastRecord1 = reader.next(lastRecord1); decoded++; } System.out.printf("Decoded %d records%n", decoded); result = lastRecord1; } Object lastRecord = result; System.out.printf("Last record: %s%n", lastRecord); } private static class MyDataFileReader<T> extends DataFileReader<T> { public MyDataFileReader(SeekableFileInput sfi, DatumReader<T> datumReader) throws IOException { super(sfi, datumReader); } @Override public void blockFinished() throws IOException { super.blockFinished(); } } } {code} The output: {noformat} Synced 100 records; file position 828 Synced 200 records; file position 1648 Synced 300 records; file position 2468 Synced 400 records; file position 3288 Synced 500 records; file position 4108 Synced 600 records; file position 4928 Synced 700 records; file position 5748 Synced 800 records; file position 6568 Synced 900 records; file position 7388 Synced 1000 records; file position 8209 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Sync marker at 116 Decoded 1000 records Last record: {"text": "i = 1000"} {noformat} In the expected output, the detected sync markers should progress, and only 100 records should be decoded: {noformat} Synced 100 records; file position 828 Synced 200 records; file position 1648 Synced 300 records; file position 2468 Synced 400 records; file position 3288 Synced 500 records; file position 4108 Synced 600 records; file position 4928 Synced 700 records; file position 5748 Synced 800 records; file position 6568 Synced 900 records; file position 7388 Synced 1000 records; file position 8209 Sync marker at 116 Sync marker at 828 Sync marker at 1648 Sync marker at 2468 Sync marker at 3288 Sync marker at 4108 Sync marker at 4928 Sync marker at 5748 Sync marker at 6568 Sync marker at 7388 Sync marker at 8209 Decoded 100 records Last record: {"text": "i = 1000"} {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)