Thank you Gna for opening the ticket. I looked into AvroInputFormat code and inspired by it I wrote a GenericAvroInputFormat. The code is awfully similar (and hence redundant) to original AvroInputFormat, so it is a good idea to modify AvroInputFormat in flink to support GenericRecord.
Anyways, I am pasting the code here for anyone who wants to use it (till your code is part of Flink stable release)- import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.flink.api.avro.FSDataInputStreamWrapper; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> { private transient long end; private transient Schema schema; private transient FileReader<GenericRecord> fileReader; private boolean reuseAvroValue = true; private static final long serialVersionUID = 1L; public GenericAvroInputFormat(Path filePath, Schema schema) { super(filePath); this.schema = schema; } public void setReuseAvroValue(boolean reuseAvroValue) { this.reuseAvroValue = reuseAvroValue; } public void setUnsplittable(boolean unsplittable) { this.unsplittable = unsplittable; } @Override public TypeInformation<GenericRecord> getProducedType() { return TypeExtractor.getForClass(GenericRecord.class); } @Override public void open(FileInputSplit split) throws IOException { super.open(split); SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); DatumReader<GenericRecord> reader = new GenericDatumReader<>(); fileReader = DataFileReader.openReader(sin, reader); fileReader.sync(split.getStart()); this.end = split.getStart() + split.getLength(); } @Override public boolean reachedEnd() throws IOException { return !fileReader.hasNext() || fileReader.pastSync(end); } @Override public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException { if (reachedEnd()) { return null; } if (!reuseAvroValue) { reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class); } reuseValue = fileReader.next(reuseValue); return reuseValue; } } Usage: public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final Path inPath = new Path(args[0]); Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc")); DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema)); dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() { @Override public Tuple2<Long,String> map(GenericRecord record) { Long id = (Long) record.get("id"); String someString = record.get("somestring").toString(); return new Tuple2<>(id, someString); } }).writeAsText(args[1]); env.execute(); } -Tarandeep On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath < gna.phetsar...@teamaol.com> wrote: > Tarandeep, > > There isn't a way yet, but I am proposing to do one: > https://issues.apache.org/jira/browse/FLINK-3691 > > -Gna > > On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com> > wrote: > >> Hi, >> >> Can someone please point me to an example of creating DataSet using Avro >> Generic Records? >> >> I tried this code - >> >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> final Path iPath = new Path(args[0]); >> >> DataSet<GenericRecord> dataSet = env.createInput(new >> AvroInputFormat<>(iPath, GenericRecord.class)); >> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() { >> @Override >> public Tuple2<Integer,String> map(GenericRecord record) { >> Integer id = (Integer) record.get("id"); >> String userAgent = (String) record.get("user_agent"); >> return new Tuple2<>(id, userAgent); >> } >> }).writeAsText(args[1]); >> >> env.execute(); >> >> But I got an exception- >> >> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: >> interface org.apache.avro.generic.GenericRecord >> at >> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276) >> at >> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) >> at >> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217) >> at >> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50) >> at >> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100) >> at >> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> By looking at StackTrace, I get that AvroInputFormat tries to read Avro >> file as SpecificRecords. Is there a way to read Avro file as GenericRecords? >> >> >> Thanks, >> Tarandeep >> > > > > -- > > > *Gna Phetsarath*System Architect // AOL Platforms // Data Services // > Applied Research Chapter > 770 Broadway, 5th Floor, New York, NY 10003 > o: 212.402.4871 // m: 917.373.7363 > vvmr: 8890237 aim: sphetsarath20 t: @sourigna > > * <http://www.aolplatforms.com>* >