Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a
GenericAvroInputFormat. The code is awfully similar (and hence redundant)
to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till
your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends
FileInputFormat<GenericRecord> implements
ResultTypeQueryable<GenericRecord> {

    private transient long end;
    private transient Schema schema;
    private transient FileReader<GenericRecord> fileReader;
    private boolean reuseAvroValue = true;

    private static final long serialVersionUID = 1L;

    public GenericAvroInputFormat(Path filePath, Schema schema) {
        super(filePath);
        this.schema = schema;
    }

    public void setReuseAvroValue(boolean reuseAvroValue) {
        this.reuseAvroValue = reuseAvroValue;
    }

    public void setUnsplittable(boolean unsplittable) {
        this.unsplittable = unsplittable;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        SeekableInput sin = new FSDataInputStreamWrapper(stream,
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
        DatumReader<GenericRecord> reader = new GenericDatumReader<>();
        fileReader = DataFileReader.openReader(sin, reader);
        fileReader.sync(split.getStart());
        this.end = split.getStart() + split.getLength();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return !fileReader.hasNext() || fileReader.pastSync(end);
    }

    @Override
    public GenericRecord nextRecord(GenericRecord reuseValue) throws
IOException {
        if (reachedEnd()) {
            return null;
        }

        if (!reuseAvroValue) {
            reuseValue =
InstantiationUtil.instantiate(GenericRecord.class, Object.class);
        }

        reuseValue = fileReader.next(reuseValue);
        return reuseValue;
    }
}


Usage:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final Path inPath = new Path(args[0]);

    Schema schema = new Schema.Parser().parse(new
File("/path/to/schemafile.avsc"));
    DataSet<GenericRecord> dataSet = env.createInput(new
GenericAvroInputFormat(inPath, schema));
    dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
        @Override
        public Tuple2<Long,String> map(GenericRecord record) {
            Long id = (Long) record.get("id");
            String someString = record.get("somestring").toString();
            return new Tuple2<>(id, someString);
        }
    }).writeAsText(args[1]);

    env.execute();
}


-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Tarandeep,
>
> There isn't a way yet, but I am proposing to do one:
> https://issues.apache.org/jira/browse/FLINK-3691
>
> -Gna
>
> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Can someone please point me to an example of creating DataSet using Avro
>> Generic Records?
>>
>> I tried this code -
>>
>>     final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final Path iPath = new Path(args[0]);
>>
>>     DataSet<GenericRecord> dataSet = env.createInput(new 
>> AvroInputFormat<>(iPath, GenericRecord.class));
>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>         @Override
>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>             Integer id = (Integer) record.get("id");
>>             String userAgent = (String) record.get("user_agent");
>>             return new Tuple2<>(id, userAgent);
>>         }
>>     }).writeAsText(args[1]);
>>
>>     env.execute();
>>
>> But I got an exception-
>>
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>>     at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>     at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>     at
>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>     at
>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>     at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>     at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>     at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>
>>
>> Thanks,
>> Tarandeep
>>
>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>

Reply via email to