[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958995#comment-14958995 ]
Stefano Bortoli edited comment on FLINK-2800 at 10/15/15 3:42 PM: ------------------------------------------------------------------ Hi guys. I have played around the KryoSerialization class. I have upgraded to Kryo 3.0.2 which offers a very convenient KryoPool with customizable KryoFactory, and then I moved in the method the instantiation of all the fields that were shared by serialization and deserialization method. I have also registered the classes (sometimes Kryo does not serialize the classes in the same order if you don't register them, and can cause problems). I still have to evaluate whether all of these changes are necessary, but with the kryoPool, registering the classes, and moving the Input and Output fields in the methods solved the exceptions above. I will keep on investigating. Anyway, this shows that it was actually a race condition, perhaps because the KryoSerializer that is not cloned as expected along the execution chain. [EDIT] I have also set the default instantiator strategy to support serialization of objects that do not have constructor with no arguments {code} Kryo kryo = new ScalaKryoInstantiator().newKryo(); final DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy(); instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); kryo.setInstantiatorStrategy(instantiatorStrategy); {code} was (Author: stefano.bortoli): Hi guys. I have played around the KryoSerialization class. I have upgraded to Kryo 3.0.2 which offers a very convenient KryoPool with customizable KryoFactory, and then I moved in the method the instantiation of all the fields that were shared by serialization and deserialization method. I have also registered the classes (sometimes Kryo does not serialize the classes in the same order if you don't register them, and can cause problems). I still have to evaluate whether all of these changes are necessary, but with the kryoPool, registering the classes, and moving the Input and Output fields in the methods solved the exceptions above. I will keep on investigating. Anyway, this shows that it was actually a race condition, perhaps because the KryoSerializer that is not cloned as expected along the execution chain. > kryo serialization problem > -------------------------- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 > Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > ------------------------------------------------------------------------ > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)