Perhaps we can put hands on it during the FlinkForward. :-D I have updated the ticket description finding out that the issue is generated performing a join just after the cross. See you in Berlin!
saluti, Stefano 2015-10-06 9:39 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Stefano, > > we'll definitely look into it once Flink Forward is over and we've > finished the current release work. Thanks for reporting the issue. > > Cheers, > Till > > On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli <bort...@okkam.it> wrote: > >> Hi guys, I could manage to complete the process crossing byte arrays I >> deserialize within the group function. However, I think this workaround is >> feasible just with relatively simple processes. Any idea/plan about to fix >> the serialization problem? >> >> saluti, >> Stefano >> >> Stefano Bortoli, PhD >> >> *ENS Technical Director *_______________________________________________ >> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >> >> *Email:* bort...@okkam.it >> >> *Phone nr: +39 0461 1823913 <%2B39%200461%201823913> * >> >> *Headquarters:* Trento (Italy), Via Trener 8 >> *Registered office:* Trento (Italy), via Segantini 23 >> >> Confidentially notice. This e-mail transmission may contain legally >> privileged and/or confidential information. Please do not read it if you >> are not the intended recipient(S). Any use, distribution, reproduction or >> disclosure by any other person is strictly prohibited. If you have received >> this e-mail in error, please notify the sender and destroy the original >> transmission and its attachments without reading or saving it in any manner. >> >> 2015-10-02 12:05 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >> >>> I don't know whether it is the same issue, but after switching from my >>> POJOs to BSONObject I have got a race condition issue with kryo >>> serialization. >>> I could complete the process using the byte[], but at this point I >>> actually need the POJO. I truly believe it is related to the reuse of the >>> Kryo instance, which is not thread safe. >>> >>> >>> ------------------------------------------------------------------------------------------------------ >>> 2015-10-02 11:55:26 INFO JobClient:161 - 10/02/2015 11:55:26 >>> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched >>> to FAILED >>> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0 >>> at java.util.ArrayList.rangeCheck(ArrayList.java:635) >>> at java.util.ArrayList.get(ArrayList.java:411) >>> at >>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) >>> at >>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) >>> at >>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) >>> at >>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) >>> at >>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) >>> at >>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >>> >>>> here it is: https://issues.apache.org/jira/browse/FLINK-2800 >>>> >>>> saluti, >>>> Stefano >>>> >>>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen <se...@apache.org>: >>>> >>>>> This looks to me like a bug where type registrations are not properly >>>>> forwarded to all Serializers. >>>>> >>>>> Can you open a JIRA ticket for this? >>>>> >>>>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <s.bort...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi guys, >>>>>> >>>>>> I hit a Kryo exception while running a process 'crossing' POJOs >>>>>> datasets. I am using the 0.10-milestone-1. >>>>>> Checking the serializer: >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>>>>> >>>>>> I have noticed that the Kryo instance is reused along serialization >>>>>> calls (e.g. line 187). However, Kryo is not threadsafe, and therefore I >>>>>> think it may cause the problem due to possible race condition. We had >>>>>> these >>>>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it >>>>>> should just a matter of calling the >>>>>> >>>>>> what should I do? Open a ticket? >>>>>> >>>>>> Thanks a lot guys for the great job! >>>>>> >>>>>> saluti, >>>>>> Stefano >>>>>> >>>>>> ----------------------------------------- >>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>>> class ID: 114 >>>>>> at >>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) >>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>> at >>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>> at >>>>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) >>>>>> at >>>>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) >>>>>> at >>>>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) >>>>>> at >>>>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>> >>>>> >>>> >>> >> >