Hey guys, I have been trying to get avro deserialization to work, but I’ve run into the issue where flink (1.10) is trying to serialize the avro classes with kryo:
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?] […] Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_265] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?] ... 29 more I am setting forced avro serialization on the execution env (running in local cluster right now): if(localCluster){ env = StreamExecutionEnvironment.createLocalEnvironment(parallelism); }else{ env = StreamExecutionEnvironment.getExecutionEnvironment(); } env.getConfig().disableForceKryo(); env.getConfig().enableForceAvro(); and here is where I have the avro schema defined in my AvroSerializationSchema class: private Schema schema; public AvroDeserializationSchema(String schema) { this.schema = new Schema.Parser().parse(schema); } I have the flink-avro dependency added to the pom. Any ideas why kryo is still trying to serialize the avro GenericData class? Thanks Chris