Hi Andrea, I did some quick issue searching, and it seems like this is a frequently asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.
I can’t be sure at the moment if the resolution / workaround mentioned in there makes sense, I’ll have to investigate a bit more. Also, to clarify: from the stack trace, it seems like you’re simply using whatever serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that correct? In the meanwhile, could you also try the following and rebuild Flink, and test to see if it works?: on https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349, change setReferences to false. Cheers, Gordon On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote: Good afternoon dear Community, Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace. 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat ion$.main(MatrixMultiplication.scala:46)) (1/1) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu tOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more 2017-06-07 10:18:52,594 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more What I'm doing basically is a product between matrices: I load the matrices COO formatted; the Block class is the following (really much inspired to this https://issues.apache.org/jira/browse/FLINK-3920). import breeze.linalg.{Matrix => BreezeMatrix} import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.{Matrix, SparseMatrix} class Block(val blockData: Matrix) extends MatrixLayout with Serializable { def data: Matrix = blockData def toBreeze: BreezeMatrix[Double] = blockData.asBreeze def numRows: Int = data.numRows def numCols: Int = data.numCols def *(other: Block): Block = { require(this.numCols == other.numRows) Block((blockData.asBreeze * other.toBreeze).fromBreeze) } def +(other: Block): Block = Block((blockData.asBreeze + other.toBreeze).fromBreeze) def unary_+(other: Block): Block = this + other override def equals(other: Any): Boolean = { other match { case block: Block => this.blockData.equalsMatrix(block.blockData) case _ => false } } } The block matrix is a matrix of blocks, the implicated group reduce function it's the last step of the product function. class SumGroupOfBlocks(blockMapper: BlockMapper) extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), (BlockID, Block)] { override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, Int, Block))], out: Collector[(BlockID, Block)]) : Unit = { val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { case ((i, j, left), (x, y, right)) => (i, y, left * right) }.toSeq val reducedGroup = multipliedGroup.reduce((left, right) => { val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) (i, j, leftBlock + rightBlock) }) out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, reducedGroup._2), reducedGroup._3) } } The above described exception happens when I try to increase the matrices sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 matrices, but not with 2000x2000 matrices and above. I think it worths to mention also that the IndexOutOfBoundsException is always seeking for index 109 (on different matrices sizes) and the size of the Array is changing in a range (5-7). It looks like somehow the serialized message are truncated right before their delivery. I tried to follow several solutions, not in order what has not been worked: - employing flink-1.2.0, flink-1.3.0 - updating flink kryo library to 3.0.3 - running on parallelism 1 - explicitly register my custom classes to Kryo - varying the size of my blocks - trying to increase akka.framesize I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 6GB task manager heap size. 16384 numOfBuffers and 16384 networkBufferSize. If I run the code on my laptop on 2000x2000 matrices, it works, likely due to jumping off remote serialization. I really hope someone could help here. It's becoming really painful... Thank you so much. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.