@Flavio, doesn’t this look like the exception you often encountered a while back? If I remember correctly that was fixed by Kurt, right?
Best, Aljoscha > On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Andrea, > > I did some quick issue searching, and it seems like this is a frequently > asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428 > <https://github.com/EsotericSoftware/kryo/issues/428>. > > I can’t be sure at the moment if the resolution / workaround mentioned in > there makes sense, I’ll have to investigate a bit more. > > Also, to clarify: from the stack trace, it seems like you’re simply using > whatever serializer Kryo defaults to (i.e. FieldSerializer), and not > registering your own, is that correct? > > In the meanwhile, could you also try the following and rebuild Flink, and > test to see if it works?: > on > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349 > > <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349>, > change setReferences to false. > > Cheers, > Gordon > > > On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io > <mailto:andrea.sp...@radicalbit.io>) wrote: > >> Good afternoon dear Community, >> >> Since few days I'm really struggling to understand the reason behind this >> KryoException. Here the stack trace. >> >> 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask >> - Error in task code: CHAIN GroupReduce (GroupReduce at >> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) >> >> -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat >> ion$.main(MatrixMultiplication.scala:46)) (1/1) >> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >> (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B >> lockMatrix.scala:103)) -> Map (Map at >> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' >> >> , caused an error: E >> rror obtaining the sorted input: Thread 'SortMerger spilling thread' >> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: >> 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger spilling thread' terminated due to an exception: >> java.lang.IndexOu >> tOfBoundsException: Index: 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) >> at >> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) >> >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: >> 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >> >> Caused by: com.esotericsoftware.kryo.KryoException: >> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) >> >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) >> >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) >> >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >> >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >> at java.util.ArrayList.get(ArrayList.java:429) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> ... 11 more >> 2017-06-07 10:18:52,594 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Memory usage >> stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] >> 2017-06-07 10:18:52,766 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Direct >> memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 >> 2017-06-07 10:18:52,766 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Off-heap >> pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: >> 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB >> (used/committed/max)] >> 2017-06-07 10:18:52,766 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Garbage >> collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], >> [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] >> 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task >> - CHAIN GroupReduce (GroupReduce at >> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) >> >> -> Map (Map at >> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) >> >> (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. >> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >> (GroupReduce at >> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) >> >> -> Map (Map at >> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' >> >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger >> spilling thread' terminated due to an exception: >> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger spilling thread' terminated due to an exception: >> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) >> at >> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) >> >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: >> 109, Size: 5 >> Serialization trace: >> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) >> >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) >> >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) >> >> at >> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) >> >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >> >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >> at java.util.ArrayList.get(ArrayList.java:429) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> ... 11 more >> >> >> What I'm doing basically is a product between matrices: I load the matrices >> COO formatted; the Block class is the following (really much inspired to >> this https://issues.apache.org/jira/browse/FLINK-3920). >> >> >> import breeze.linalg.{Matrix => BreezeMatrix} >> import org.apache.flink.ml.math.Breeze._ >> import org.apache.flink.ml.math.{Matrix, SparseMatrix} >> >> class Block(val blockData: Matrix) extends MatrixLayout with Serializable { >> >> def data: Matrix = blockData >> >> def toBreeze: BreezeMatrix[Double] = blockData.asBreeze >> >> def numRows: Int = data.numRows >> >> def numCols: Int = data.numCols >> >> def *(other: Block): Block = { >> >> require(this.numCols == other.numRows) >> >> Block((blockData.asBreeze * other.toBreeze).fromBreeze) >> } >> >> def +(other: Block): Block = >> Block((blockData.asBreeze + other.toBreeze).fromBreeze) >> >> def unary_+(other: Block): Block = this + other >> >> override def equals(other: Any): Boolean = { >> other match { >> case block: Block => this.blockData.equalsMatrix(block.blockData) >> case _ => false >> } >> } >> >> } >> >> The block matrix is a matrix of blocks, the implicated group reduce function >> it's the last step of the product function. >> >> class SumGroupOfBlocks(blockMapper: BlockMapper) >> extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), >> (BlockID, Block)] { >> >> override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, >> Int, Block))], out: Collector[(BlockID, Block)]) >> : Unit = { >> >> val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { >> case ((i, j, left), (x, y, right)) => (i, y, left * right) >> }.toSeq >> >> val reducedGroup = multipliedGroup.reduce((left, right) => { >> val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) >> >> (i, j, leftBlock + rightBlock) >> }) >> >> out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, >> reducedGroup._2), reducedGroup._3) >> } >> } >> >> The above described exception happens when I try to increase the matrices >> sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 >> matrices, but not with 2000x2000 matrices and above. >> >> I think it worths to mention also that the IndexOutOfBoundsException is >> always seeking for index 109 (on different matrices sizes) and the size of >> the Array is changing in a range (5-7). It looks like somehow the serialized >> message are truncated right before their delivery. >> >> I tried to follow several solutions, not in order what has not been worked: >> >> - employing flink-1.2.0, flink-1.3.0 >> - updating flink kryo library to 3.0.3 >> - running on parallelism 1 >> - explicitly register my custom classes to Kryo >> - varying the size of my blocks >> - trying to increase akka.framesize >> >> I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. >> 6GB task manager heap size. >> 16384 numOfBuffers and 16384 networkBufferSize. >> >> If I run the code on my laptop on 2000x2000 matrices, it works, likely due >> to jumping off remote serialization. >> >> I really hope someone could help here. It's becoming really painful... >> >> Thank you so much. >> >> Cheers, Andrea >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html >> >> Sent from the Apache Flink User Mailing List archive. mailing list archive >> at Nabble.com.