Tranadeep, Also, in your code example, when *reuseAvroValue* is *false* the code will fail with this message:
java.lang.RuntimeException: The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is no proper class, it is either abstract, an interface, or a primitive type. at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222) at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147) at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122) at I had encountered this when I was write the PR. -Gna On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath < gna.phetsar...@teamaol.com> wrote: > Tranadeep, > > Thanks for pasting your code! > > I have a PR ready that extends AvroInputFormat and will submit it soon. > > Still waiting for the legal team at AOL to approve it. > > -Gna > > On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <tarand...@gmail.com> > wrote: > >> Thank you Gna for opening the ticket. >> >> I looked into AvroInputFormat code and inspired by it I wrote a >> GenericAvroInputFormat. The code is awfully similar (and hence redundant) >> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat >> in flink to support GenericRecord. >> >> Anyways, I am pasting the code here for anyone who wants to use it (till >> your code is part of Flink stable release)- >> >> import java.io.IOException; >> >> import org.apache.avro.Schema; >> import org.apache.avro.file.DataFileReader; >> import org.apache.avro.file.FileReader; >> import org.apache.avro.file.SeekableInput; >> import org.apache.avro.generic.GenericDatumReader; >> import org.apache.avro.generic.GenericRecord; >> import org.apache.avro.io.DatumReader; >> import org.apache.flink.api.avro.FSDataInputStreamWrapper; >> import org.apache.flink.api.common.io.FileInputFormat; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.java.typeutils.ResultTypeQueryable; >> import org.apache.flink.api.java.typeutils.TypeExtractor; >> import org.apache.flink.core.fs.FileInputSplit; >> import org.apache.flink.core.fs.Path; >> import org.apache.flink.util.InstantiationUtil; >> >> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> >> implements ResultTypeQueryable<GenericRecord> { >> >> private transient long end; >> private transient Schema schema; >> private transient FileReader<GenericRecord> fileReader; >> private boolean reuseAvroValue = true; >> >> private static final long serialVersionUID = 1L; >> >> public GenericAvroInputFormat(Path filePath, Schema schema) { >> super(filePath); >> this.schema = schema; >> } >> >> public void setReuseAvroValue(boolean reuseAvroValue) { >> this.reuseAvroValue = reuseAvroValue; >> } >> >> public void setUnsplittable(boolean unsplittable) { >> this.unsplittable = unsplittable; >> } >> >> @Override >> public TypeInformation<GenericRecord> getProducedType() { >> return TypeExtractor.getForClass(GenericRecord.class); >> } >> >> @Override >> public void open(FileInputSplit split) throws IOException { >> super.open(split); >> SeekableInput sin = new FSDataInputStreamWrapper(stream, >> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); >> DatumReader<GenericRecord> reader = new GenericDatumReader<>(); >> fileReader = DataFileReader.openReader(sin, reader); >> fileReader.sync(split.getStart()); >> this.end = split.getStart() + split.getLength(); >> } >> >> @Override >> public boolean reachedEnd() throws IOException { >> return !fileReader.hasNext() || fileReader.pastSync(end); >> } >> >> @Override >> public GenericRecord nextRecord(GenericRecord reuseValue) throws >> IOException { >> if (reachedEnd()) { >> return null; >> } >> >> if (!reuseAvroValue) { >> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, >> Object.class); >> } >> >> reuseValue = fileReader.next(reuseValue); >> return reuseValue; >> } >> } >> >> >> Usage: >> >> public static void main(String[] args) throws Exception { >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> final Path inPath = new Path(args[0]); >> >> Schema schema = new Schema.Parser().parse(new >> File("/path/to/schemafile.avsc")); >> DataSet<GenericRecord> dataSet = env.createInput(new >> GenericAvroInputFormat(inPath, schema)); >> dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() { >> @Override >> public Tuple2<Long,String> map(GenericRecord record) { >> Long id = (Long) record.get("id"); >> String someString = record.get("somestring").toString(); >> return new Tuple2<>(id, someString); >> } >> }).writeAsText(args[1]); >> >> env.execute(); >> } >> >> >> -Tarandeep >> >> >> >> >> >> >> >> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath < >> gna.phetsar...@teamaol.com> wrote: >> >>> Tarandeep, >>> >>> There isn't a way yet, but I am proposing to do one: >>> https://issues.apache.org/jira/browse/FLINK-3691 >>> >>> -Gna >>> >>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Can someone please point me to an example of creating DataSet using >>>> Avro Generic Records? >>>> >>>> I tried this code - >>>> >>>> final ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> final Path iPath = new Path(args[0]); >>>> >>>> DataSet<GenericRecord> dataSet = env.createInput(new >>>> AvroInputFormat<>(iPath, GenericRecord.class)); >>>> dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() { >>>> @Override >>>> public Tuple2<Integer,String> map(GenericRecord record) { >>>> Integer id = (Integer) record.get("id"); >>>> String userAgent = (String) record.get("user_agent"); >>>> return new Tuple2<>(id, userAgent); >>>> } >>>> }).writeAsText(args[1]); >>>> >>>> env.execute(); >>>> >>>> But I got an exception- >>>> >>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: >>>> interface org.apache.avro.generic.GenericRecord >>>> at >>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276) >>>> at >>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) >>>> at >>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217) >>>> at >>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50) >>>> at >>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100) >>>> at >>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro >>>> file as SpecificRecords. Is there a way to read Avro file as >>>> GenericRecords? >>>> >>>> >>>> Thanks, >>>> Tarandeep >>>> >>> >>> >>> >>> -- >>> >>> >>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services // >>> Applied Research Chapter >>> 770 Broadway, 5th Floor, New York, NY 10003 >>> o: 212.402.4871 // m: 917.373.7363 >>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna >>> >>> * <http://www.aolplatforms.com>* >>> >> >> > > > -- > > > *Gna Phetsarath*System Architect // AOL Platforms // Data Services // > Applied Research Chapter > 770 Broadway, 5th Floor, New York, NY 10003 > o: 212.402.4871 // m: 917.373.7363 > vvmr: 8890237 aim: sphetsarath20 t: @sourigna > > * <http://www.aolplatforms.com>* > -- *Gna Phetsarath*System Architect // AOL Platforms // Data Services // Applied Research Chapter 770 Broadway, 5th Floor, New York, NY 10003 o: 212.402.4871 // m: 917.373.7363 vvmr: 8890237 aim: sphetsarath20 t: @sourigna * <http://www.aolplatforms.com>*