Hi all - sorry this seems like a silly question, but I can't figure it out.

I'm using an AvroInputFormat in order to read an Avro file like this:

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)

This works fine in local mode, but when submitted to a flink cluster I get
serialisation errors that look like this:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
        at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
        ... 7 more
Caused by: java.lang.IllegalAccessException: Class
com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a
member of class org.apache.avro.Schema$StringSchema with modifiers
"public"
        at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
        at 
java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
        at 
java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
        at 
com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
        at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
        ... 37 more



I realise this is an issue that is mentioned in the documentation, but
given that it looks like it is a problem with some class insider the
AvroInputFormat that is having trouble being serialised, I'm not sure on
what he best solution is.

This works fine if I specify the class not to be generic - i.e

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[Example])
val lines = env.readFile(textInputFormat, path

However I can't get this to run in local mode with a case class `Example`
that is nested, which is required as the Avro files have very nested fields.

Reply via email to