Hi Rommel,

I wonder why avro type would use kryo as its serializer to serialize, could you 
check what kind of type information could get via TypeInformation.of(class) [1]


[1] 
https://github.com/apache/flink/blob/cc3f85eb4cd3e5031a84321e62d01b3009a00577/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java#L208


Best
Yun Tang
________________________________
From: Rommel Holmes <rommelhol...@gmail.com>
Sent: Wednesday, June 23, 2021 13:43
To: user <user@flink.apache.org>
Subject: PoJo to Avro Serialization throw KryoException: 
java.lang.UnsupportedOperationException

My Unit test was running OK under Flink 1.11.2 with parquet-avro 1.10.0, once I 
upgrade to 1.12.0 with parquet-avro 1.12.0, my unit test will throw

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
~[kryo-2.24.0.jar:?]
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
~[kryo-2.24.0.jar:?]
    at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 ~[kryo-2.24.0.jar:?]
    at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
...

aused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) 
~[?:1.8.0_282]
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 ~[kryo-2.24.0.jar:?]
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
~[kryo-2.24.0.jar:?]
    ... 27 more
My Unit test code snippet is something like below:

private ImmutableList<PoJo> testData = ImmutableList.of(
            PoJo.build("123", "0.0.0.0", null),
            PoJo.build("123", "0.0.0.1", 2L)
    );

DataStream<PoJo> input = env
                .addSource(new TestSource(testData), 
PojoTypeInfo.of(PoJo.class))
                .assignTimestampsAndWatermarks(watermarkStrategy);

DataStream<GenericRecord> output = input
                .map(TestClass::convertPoJoToGenericRecord)
                .returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema()));

output.addSink();

The function is something like

GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception {
        Schema schema = PoJo.getAvroSchema();
        GenericRecordBuilder builder = new GenericRecordBuilder(schema);
        for (Schema.Field field : schema.getFields()) {
            builder.set(field.name<http://field.name>(), 
TestClass.getObjectField(field, pojo));
        }
        GenericRecord record = builder.build();
        return record;
    }

Can anyone help on this?

Thank you

Reply via email to