Hi Liu,

Looks like you're trying to use Kryo. Can you try using Java serialization?
I wasn't aware that Kryo was an option in Flink, but we usually assume that
serialization is happening with Java serialization because Kryo requires
extra compatibility that is difficult to inject.

On Sun, Feb 7, 2021 at 12:38 AM 1 <liubo1022...@126.com> wrote:

> Hi, All:
>
> I'm very happy for 0.11's release, that’s great!
> I test the new features Immediately, like flink cdc, flink rewriteDataFiles,
> flink streaming read
> But when I write a row level delete, there is something badly.
>
> I test flink cdc(just like
> https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
>  ) and flink rewriteDataFiles on iceberg 0.11,  everything is ok when 
> appending
> records (+I,1,aaa,20210128),  but when write a row level delete file by id
> (-D,1,20210128), rewriteDataFiles throw an exception, the same to
> DataStream streaming read job start, I open an issue for it
> https://github.com/apache/iceberg/issues/2219
>
> Exception is below, rewriteDataFiles and streaming read job start are
> same for it
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> nullValueCounts (org.apache.iceberg.GenericDataFile)
> file (org.apache.iceberg.BaseFileScanTask)
> fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
> tasks (org.apache.iceberg.BaseCombinedScanTask)
> task (org.apache.iceberg.flink.source.FlinkInputSplit)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
> at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>
> Thx
> Liu
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to