hi,ryan: Flink will automatically infer the data type. If it cannot be inferred, it will use kryo to do de/serialized.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#general-class-types I tested this problem and it does exist. I guess that a certain pojo does not meet the serialization requirements of flink. I am also trying to find solution. On 02/10/2021 04:32,Ryan Blue<rb...@netflix.com.INVALID> wrote: Hi Liu, Looks like you're trying to use Kryo. Can you try using Java serialization? I wasn't aware that Kryo was an option in Flink, but we usually assume that serialization is happening with Java serialization because Kryo requires extra compatibility that is difficult to inject. On Sun, Feb 7, 2021 at 12:38 AM 1 <liubo1022...@126.com> wrote: Hi, All: I'm very happy for 0.11's release, that’s great! I test the new features Immediately, like flink cdc, flink rewriteDataFiles, flink streaming read But when I write a row level delete, there is something badly. I test flink cdc(just like https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java ) and flink rewriteDataFiles on iceberg 0.11, everything is ok when appending records (+I,1,aaa,20210128), but when write a row level delete file by id(-D,1,20210128), rewriteDataFiles throw an exception, the same to DataStream streaming read job start, I open an issue for it https://github.com/apache/iceberg/issues/2219 Exception is below, rewriteDataFiles and streaming read job start are same for it Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: nullValueCounts (org.apache.iceberg.GenericDataFile) file (org.apache.iceberg.BaseFileScanTask) fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask) tasks (org.apache.iceberg.BaseCombinedScanTask) task (org.apache.iceberg.flink.source.FlinkInputSplit) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) Thx Liu -- Ryan Blue Software Engineer Netflix