Hi all - sorry this seems like a silly question, but I can't figure it out.
I'm using an AvroInputFormat in order to read an Avro file like this: val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[GenericRecord]) val lines = env.readFile(textInputFormat, path) This works fine in local mode, but when submitted to a flink cluster I get serialisation errors that look like this: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$StringSchema Serialization trace: schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) elementType (org.apache.avro.Schema$ArraySchema) schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ... 7 more Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$StringSchema with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Constructor.newInstance(Constructor.java:413) at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123) ... 37 more I realise this is an issue that is mentioned in the documentation, but given that it looks like it is a problem with some class insider the AvroInputFormat that is having trouble being serialised, I'm not sure on what he best solution is. This works fine if I specify the class not to be generic - i.e val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[Example]) val lines = env.readFile(textInputFormat, path However I can't get this to run in local mode with a case class `Example` that is nested, which is required as the Avro files have very nested fields.