Hi,
Can someone please point me to an example of creating DataSet using Avro
Generic Records?
I tried this code -
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);
DataSet<GenericRecord> dataSet = env.createInput(new
AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);
env.execute();
But I got an exception-
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
at
org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
By looking at StackTrace, I get that AvroInputFormat tries to read Avro
file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
Thanks,
Tarandeep