Thank you Gna for opening the ticket.
I looked into AvroInputFormat code and inspired by it I wrote a
GenericAvroInputFormat. The code is awfully similar (and hence redundant)
to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
in flink to support GenericRecord.
Anyways, I am pasting the code here for anyone who wants to use it (till
your code is part of Flink stable release)-
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
public class GenericAvroInputFormat extends
FileInputFormat<GenericRecord> implements
ResultTypeQueryable<GenericRecord> {
private transient long end;
private transient Schema schema;
private transient FileReader<GenericRecord> fileReader;
private boolean reuseAvroValue = true;
private static final long serialVersionUID = 1L;
public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}
public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}
public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream,
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}
@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}
@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws
IOException {
if (reachedEnd()) {
return null;
}
if (!reuseAvroValue) {
reuseValue =
InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}
reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}
Usage:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);
Schema schema = new Schema.Parser().parse(new
File("/path/to/schemafile.avsc"));
DataSet<GenericRecord> dataSet = env.createInput(new
GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
@Override
public Tuple2<Long,String> map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);
env.execute();
}
-Tarandeep
On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
[email protected]> wrote:
> Tarandeep,
>
> There isn't a way yet, but I am proposing to do one:
> https://issues.apache.org/jira/browse/FLINK-3691
>
> -Gna
>
> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[email protected]>
> wrote:
>
>> Hi,
>>
>> Can someone please point me to an example of creating DataSet using Avro
>> Generic Records?
>>
>> I tried this code -
>>
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> final Path iPath = new Path(args[0]);
>>
>> DataSet<GenericRecord> dataSet = env.createInput(new
>> AvroInputFormat<>(iPath, GenericRecord.class));
>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>> @Override
>> public Tuple2<Integer,String> map(GenericRecord record) {
>> Integer id = (Integer) record.get("id");
>> String userAgent = (String) record.get("user_agent");
>> return new Tuple2<>(id, userAgent);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>>
>> But I got an exception-
>>
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>> at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>> at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>> at
>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>> at
>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>> at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>> at
>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>
>>
>> Thanks,
>> Tarandeep
>>
>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>