[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959143#comment-14959143 ]
Stefano Bortoli commented on FLINK-2800: ---------------------------------------- Ok, I have run a couple of tests more, and the process was completed without the KryoPool. However, when I reverted back to the global variable for Input and Output, I could reproduce the error. {quote} 10/15/2015 18:05:59 Job execution switched to status FAILED. 2015-10-15 18:05:59 INFO JobManager:137 - Status of job 1b05e39a7ea019d0a57702eb2a06d64a (Flink Java Job at Thu Oct 15 18:03:33 CEST 2015) changed to FAILED. com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 115 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:667) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:778) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:153) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:251) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:1) at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 2015-10-15 18:05:59 INFO JobClient:200 - Job execution failed {quote} This version of the method does not work: {code} @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { if (source != previousIn) { previousIn = source; DataInputViewStream inputStream = new DataInputViewStream(source); input = new NoFetchingInput(inputStream); } // DataInputViewStream inputStream = new DataInputViewStream(source); // Input input = new NoFetchingInput(inputStream); checkKryoPoolInitialization(); // Kryo kryo = kryoPool.borrow(); try { return (T) kryo.readClassAndObject(input); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally { // kryoPool.release(kryo); } } {code} whereas this one worked: {code} @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { if (source != previousIn) { previousIn = source; // DataInputViewStream inputStream = new DataInputViewStream(source); // input = new NoFetchingInput(inputStream); } DataInputViewStream inputStream = new DataInputViewStream(source); Input input = new NoFetchingInput(inputStream); checkKryoPoolInitialization(); // Kryo kryo = kryoPool.borrow(); try { return (T) kryo.readClassAndObject(input); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally { // kryoPool.release(kryo); } } {code} I hope this helps either solving the problem, or finding the cause of the problem. > kryo serialization problem > -------------------------- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 > Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > ------------------------------------------------------------------------ > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)