Hi Rommel, I wonder why avro type would use kryo as its serializer to serialize, could you check what kind of type information could get via TypeInformation.of(class) [1]
[1] https://github.com/apache/flink/blob/cc3f85eb4cd3e5031a84321e62d01b3009a00577/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java#L208 Best Yun Tang ________________________________ From: Rommel Holmes <rommelhol...@gmail.com> Sent: Wednesday, June 23, 2021 13:43 To: user <user@flink.apache.org> Subject: PoJo to Avro Serialization throw KryoException: java.lang.UnsupportedOperationException My Unit test was running OK under Flink 1.11.2 with parquet-avro 1.10.0, once I upgrade to 1.12.0 with parquet-avro 1.12.0, my unit test will throw com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?] ... aused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_282] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?] ... 27 more My Unit test code snippet is something like below: private ImmutableList<PoJo> testData = ImmutableList.of( PoJo.build("123", "0.0.0.0", null), PoJo.build("123", "0.0.0.1", 2L) ); DataStream<PoJo> input = env .addSource(new TestSource(testData), PojoTypeInfo.of(PoJo.class)) .assignTimestampsAndWatermarks(watermarkStrategy); DataStream<GenericRecord> output = input .map(TestClass::convertPoJoToGenericRecord) .returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema())); output.addSink(); The function is something like GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception { Schema schema = PoJo.getAvroSchema(); GenericRecordBuilder builder = new GenericRecordBuilder(schema); for (Schema.Field field : schema.getFields()) { builder.set(field.name<http://field.name>(), TestClass.getObjectField(field, pojo)); } GenericRecord record = builder.build(); return record; } Can anyone help on this? Thank you