Tranadeep, Thanks for pasting your code!
I have a PR ready that extends AvroInputFormat and will submit it soon. Still waiting for the legal team at AOL to approve it. -Gna On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <tarand...@gmail.com> wrote: > Thank you Gna for opening the ticket. > > I looked into AvroInputFormat code and inspired by it I wrote a > GenericAvroInputFormat. The code is awfully similar (and hence redundant) > to original AvroInputFormat, so it is a good idea to modify AvroInputFormat > in flink to support GenericRecord. > > Anyways, I am pasting the code here for anyone who wants to use it (till > your code is part of Flink stable release)- > > import java.io.IOException; > > import org.apache.avro.Schema; > import org.apache.avro.file.DataFileReader; > import org.apache.avro.file.FileReader; > import org.apache.avro.file.SeekableInput; > import org.apache.avro.generic.GenericDatumReader; > import org.apache.avro.generic.GenericRecord; > import org.apache.avro.io.DatumReader; > import org.apache.flink.api.avro.FSDataInputStreamWrapper; > import org.apache.flink.api.common.io.FileInputFormat; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.typeutils.ResultTypeQueryable; > import org.apache.flink.api.java.typeutils.TypeExtractor; > import org.apache.flink.core.fs.FileInputSplit; > import org.apache.flink.core.fs.Path; > import org.apache.flink.util.InstantiationUtil; > > public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> > implements ResultTypeQueryable<GenericRecord> { > > private transient long end; > private transient Schema schema; > private transient FileReader<GenericRecord> fileReader; > private boolean reuseAvroValue = true; > > private static final long serialVersionUID = 1L; > > public GenericAvroInputFormat(Path filePath, Schema schema) { > super(filePath); > this.schema = schema; > } > > public void setReuseAvroValue(boolean reuseAvroValue) { > this.reuseAvroValue = reuseAvroValue; > } > > public void setUnsplittable(boolean unsplittable) { > this.unsplittable = unsplittable; > } > > @Override > public TypeInformation<GenericRecord> getProducedType() { > return TypeExtractor.getForClass(GenericRecord.class); > } > > @Override > public void open(FileInputSplit split) throws IOException { > super.open(split); > SeekableInput sin = new FSDataInputStreamWrapper(stream, > split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); > DatumReader<GenericRecord> reader = new GenericDatumReader<>(); > fileReader = DataFileReader.openReader(sin, reader); > fileReader.sync(split.getStart()); > this.end = split.getStart() + split.getLength(); > } > > @Override > public boolean reachedEnd() throws IOException { > return !fileReader.hasNext() || fileReader.pastSync(end); > } > > @Override > public GenericRecord nextRecord(GenericRecord reuseValue) throws > IOException { > if (reachedEnd()) { > return null; > } > > if (!reuseAvroValue) { > reuseValue = InstantiationUtil.instantiate(GenericRecord.class, > Object.class); > } > > reuseValue = fileReader.next(reuseValue); > return reuseValue; > } > } > > > Usage: > > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > final Path inPath = new Path(args[0]); > > Schema schema = new Schema.Parser().parse(new > File("/path/to/schemafile.avsc")); > DataSet<GenericRecord> dataSet = env.createInput(new > GenericAvroInputFormat(inPath, schema)); > dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() { > @Override > public Tuple2<Long,String> map(GenericRecord record) { > Long id = (Long) record.get("id"); > String someString = record.get("somestring").toString(); > return new Tuple2<>(id, someString); > } > }).writeAsText(args[1]); > > env.execute(); > } > > > -Tarandeep > > > > > > > > On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath < > gna.phetsar...@teamaol.com> wrote: > >> Tarandeep, >> >> There isn't a way yet, but I am proposing to do one: >> https://issues.apache.org/jira/browse/FLINK-3691 >> >> -Gna >> >> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Can someone please point me to an example of creating DataSet using Avro >>> Generic Records? >>> >>> I tried this code - >>> >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> final Path iPath = new Path(args[0]); >>> >>> DataSet<GenericRecord> dataSet = env.createInput(new >>> AvroInputFormat<>(iPath, GenericRecord.class)); >>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() { >>> @Override >>> public Tuple2<Integer,String> map(GenericRecord record) { >>> Integer id = (Integer) record.get("id"); >>> String userAgent = (String) record.get("user_agent"); >>> return new Tuple2<>(id, userAgent); >>> } >>> }).writeAsText(args[1]); >>> >>> env.execute(); >>> >>> But I got an exception- >>> >>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: >>> interface org.apache.avro.generic.GenericRecord >>> at >>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276) >>> at >>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) >>> at >>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217) >>> at >>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50) >>> at >>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100) >>> at >>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro >>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords? >>> >>> >>> Thanks, >>> Tarandeep >>> >> >> >> >> -- >> >> >> *Gna Phetsarath*System Architect // AOL Platforms // Data Services // >> Applied Research Chapter >> 770 Broadway, 5th Floor, New York, NY 10003 >> o: 212.402.4871 // m: 917.373.7363 >> vvmr: 8890237 aim: sphetsarath20 t: @sourigna >> >> * <http://www.aolplatforms.com>* >> > > -- *Gna Phetsarath*System Architect // AOL Platforms // Data Services // Applied Research Chapter 770 Broadway, 5th Floor, New York, NY 10003 o: 212.402.4871 // m: 917.373.7363 vvmr: 8890237 aim: sphetsarath20 t: @sourigna * <http://www.aolplatforms.com>*