Oscar Westra van Holthe - Kind created AVRO-4006:
----------------------------------------------------

             Summary: [Java] DataFileReader does not correctly identify last 
sync marker when reading/skipping blocks
                 Key: AVRO-4006
                 URL: https://issues.apache.org/jira/browse/AVRO-4006
             Project: Apache Avro
          Issue Type: Bug
          Components: java
    Affects Versions: 1.11.3
            Reporter: Oscar Westra van Holthe - Kind


The following code demonstrates the problem:

{code:java}
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;

import java.io.File;
import java.io.IOException;

public class AvroTest {

    public static void main(String[] args) throws IOException {
        File avroFile = new File("test.avro");

        GenericData model = GenericData.get();
        Schema simple = 
SchemaBuilder.record("TestRecord").fields().requiredString("text").endRecord();
        Schema.Field textField = simple.getField("text");
        try (DataFileWriter<Object> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(null, model)).create(simple, avroFile)) {
            for (int i = 1; i <= 1000; i++) {
                Object record = model.newRecord(null, simple);
                model.setField(record, textField.name(), textField.pos(), "i = 
" + i);
                writer.append(record);
                if (i % 100 == 0) {
                    long syncPos = writer.sync();
                    System.out.printf("Synced %d records; file position %d%n", 
i, syncPos);
                }
            }
        }

        IndexedRecord result;
        DatumReader<IndexedRecord> datumReader = new 
GenericDatumReader<>(simple, simple, model);
        try (SeekableFileInput sfi = new SeekableFileInput(avroFile);
             MyDataFileReader<IndexedRecord> reader = new 
MyDataFileReader<>(sfi, datumReader)) {
            // Find the start of the last block reading the entire file, 
WITHOUT decoding any records.
            // Note that this does decompress the data, but that's so fast 
these days that it hardly affects reading speed.
            long lastSyncPos = reader.previousSync();
            while (reader.hasNext()) {
                lastSyncPos = reader.previousSync();
                System.out.printf("Sync marker at %d%n", lastSyncPos);
                // Mark the block as read, so hasNext() will read the next block
                reader.nextBlock();
            }
            System.out.printf("Sync marker at %d%n", reader.previousSync());
            reader.seek(lastSyncPos);
            IndexedRecord lastRecord1 = null;
            int decoded = 0;
            while (reader.hasNext()) {
                lastRecord1 = reader.next(lastRecord1);
                decoded++;
            }
            System.out.printf("Decoded %d records%n", decoded);
            result = lastRecord1;
        }
        Object lastRecord = result;
        System.out.printf("Last record: %s%n", lastRecord);
    }

    private static class MyDataFileReader<T> extends DataFileReader<T> {
        public MyDataFileReader(SeekableFileInput sfi, DatumReader<T> 
datumReader) throws IOException {
            super(sfi, datumReader);
        }

        @Override
        public void blockFinished() throws IOException {
            super.blockFinished();
        }
    }
}
{code}

The output:

{noformat}
Synced 100 records; file position 828
Synced 200 records; file position 1648
Synced 300 records; file position 2468
Synced 400 records; file position 3288
Synced 500 records; file position 4108
Synced 600 records; file position 4928
Synced 700 records; file position 5748
Synced 800 records; file position 6568
Synced 900 records; file position 7388
Synced 1000 records; file position 8209
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Decoded 1000 records
Last record: {"text": "i = 1000"}
{noformat}

In the expected output, the detected sync markers should progress, and only 100 
records should be decoded:

{noformat}
Synced 100 records; file position 828
Synced 200 records; file position 1648
Synced 300 records; file position 2468
Synced 400 records; file position 3288
Synced 500 records; file position 4108
Synced 600 records; file position 4928
Synced 700 records; file position 5748
Synced 800 records; file position 6568
Synced 900 records; file position 7388
Synced 1000 records; file position 8209
Sync marker at 116
Sync marker at 828
Sync marker at 1648
Sync marker at 2468
Sync marker at 3288
Sync marker at 4108
Sync marker at 4928
Sync marker at 5748
Sync marker at 6568
Sync marker at 7388
Sync marker at 8209
Decoded 100 records
Last record: {"text": "i = 1000"}
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to