Hi Ryan,

I received the mail about 0.11.1,so can you help to check 
https://github.com/apache/iceberg/issues/2219 for it?  Thx.


Kryo is the default serialization in Flink, user can code only logic without 
manually specify the serializer, but Kryo can not de/serialization the 
UnmodifiableCollection, thanks for Jun Zhang submit PR 2258 to fix it, but we 
don’t know change the UnmodifiableMap will cause other problems?  If we can’t 
change UnmodifiableMap, We must define the serializer ourselves in Flink.


Thx


| |
liubo07199
|
|
liubo07...@hellobike.com
|
On 02/10/2021 04:32,Ryan Blue<rb...@netflix.com.INVALID> wrote:
Hi Liu,



Looks like you're trying to use Kryo. Can you try using Java serialization? I 
wasn't aware that Kryo was an option in Flink, but we usually assume that 
serialization is happening with Java serialization because Kryo requires extra 
compatibility that is difficult to inject.


On Sun, Feb 7, 2021 at 12:38 AM 1 <liubo1022...@126.com> wrote:

Hi, All: 


I'm very happy for 0.11's release, that’s great!
I test the new features Immediately, like flink cdc, flink rewriteDataFiles, 
flink streaming read
But when I write a row level delete, there is something badly.


I test flink cdc(just like 
https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
 ) and flink rewriteDataFiles on iceberg 0.11,  everything is ok when appending 
records (+I,1,aaa,20210128),  but when write a row level delete file by 
id(-D,1,20210128), rewriteDataFiles throw an exception, the same to DataStream 
streaming read job start, I open an issue for it 
https://github.com/apache/iceberg/issues/2219 


Exception is below, rewriteDataFiles and streaming read job start are same for 
it
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
Serialization trace:
nullValueCounts (org.apache.iceberg.GenericDataFile)
file (org.apache.iceberg.BaseFileScanTask)
fileScanTask (org.apache.iceberg.BaseFileScanTask$SplitScanTask)
tasks (org.apache.iceberg.BaseCombinedScanTask)
task (org.apache.iceberg.flink.source.FlinkInputSplit)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)


Thx
Liu






--

Ryan Blue
Software Engineer
Netflix

Reply via email to