I figured it out for those interested; I actually had an embedded report in my 
avro schema, so my loop was incorrectly building a single dimension map with a 
GenericRecord value, which was throwing off the map’s serialization. After 
recursing the embedded GenericRecords to build the fully realized multi-d map, 
kryo stopped choking.

Chris

From: "Slotterback, Chris" <chris_slotterb...@comcast.com>
Date: Sunday, August 23, 2020 at 2:17 AM
To: user <user@flink.apache.org>
Subject: Re: Not able to force Avro serialization

And here is the deserde block where the Schema is used to generate a 
GenericRecord:

@Override
public Map<String, Object> deserialize(byte[] bytes) throws IOException {
  DatumReader<GenericRecord> reader = new 
GenericDatumReader<GenericRecord>(schema);
  GenericRecord record = reader.read(null, 
DecoderFactory.get().binaryDecoder(bytes, null));
  Map<String, Object> map = new HashMap<>();
  record.getSchema().getFields().forEach(field -> map.put(field.name(), 
record.get(field.name())));
  return map;
}

Chris

From: "Slotterback, Chris" <chris_slotterb...@comcast.com>
Date: Sunday, August 23, 2020 at 2:07 AM
To: user <user@flink.apache.org>
Subject: Not able to force Avro serialization

Hey guys,

I have been trying to get avro deserialization to work, but I’ve run into the 
issue where flink (1.10) is trying to serialize the avro classes with kryo:

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
                at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
~[kryo-2.24.0.jar:?]
                at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 ~[kryo-2.24.0.jar:?]
                at 
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
~[kryo-2.24.0.jar:?]
                […]
Caused by: java.lang.UnsupportedOperationException
                at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) 
~[?:1.8.0_265]
                at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 ~[kryo-2.24.0.jar:?]
                at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 ~[kryo-2.24.0.jar:?]
                at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
                at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
~[kryo-2.24.0.jar:?]
                ... 29 more

I am setting forced avro serialization on the execution env (running in local 
cluster right now):
if(localCluster){
  env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);
}else{
  env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();

and here is where I have the avro schema defined in my AvroSerializationSchema 
class:

private Schema schema;
public AvroDeserializationSchema(String schema) {
  this.schema = new Schema.Parser().parse(schema);
}

I have the flink-avro dependency added to the pom.

Any ideas why kryo is still trying to serialize the avro GenericData class?

Thanks
Chris

Reply via email to