Hi Andrea,

I did some quick issue searching, and it seems like this is a frequently asked 
issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.

I can’t be sure at the moment if the resolution / workaround mentioned in there 
makes sense, I’ll have to investigate a bit more.

Also, to clarify: from the stack trace, it seems like you’re simply using 
whatever serializer Kryo defaults to (i.e. FieldSerializer), and not 
registering your own, is that correct?

In the meanwhile, could you also try the following and rebuild Flink, and test 
to see if it works?:
on 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349,
 change setReferences to false.

Cheers,
Gordon


On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

Good afternoon dear Community,  

Since few days I'm really struggling to understand the reason behind this  
KryoException. Here the stack trace.  

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code: CHAIN GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat  
ion$.main(MatrixMultiplication.scala:46)) (1/1)  
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce  
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B  
lockMatrix.scala:103)) -> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
  
, caused an error: E  
rror obtaining the sorted input: Thread 'SortMerger spilling thread'  
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:  
109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger spilling thread' terminated due to an exception:  
java.lang.IndexOu  
tOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'  
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:  
109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
  
Caused by: com.esotericsoftware.kryo.KryoException:  
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)  
at  
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at java.util.ArrayList.get(ArrayList.java:429)  
at  
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
  
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)  
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)  
at  
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  
... 11 more  
2017-06-07 10:18:52,594 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Memory usage  
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]  
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Direct  
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281  
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Off-heap  
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:  
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB  
(used/committed/max)]  
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Garbage  
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97],  
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]  
2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task  
- CHAIN GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))
  
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.  
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce  
(GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
  
, caused an error: Error obtaining the sorted input: Thread 'SortMerger  
spilling thread' terminated due to an exception:  
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger spilling thread' terminated due to an exception:  
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'  
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:  
109, Size: 5  
Serialization trace:  
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)  
at  
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)  
at  
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
  
at  
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at java.util.ArrayList.get(ArrayList.java:429)  
at  
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
  
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)  
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)  
at  
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  
... 11 more  


What I'm doing basically is a product between matrices: I load the matrices  
COO formatted; the Block class is the following (really much inspired to  
this https://issues.apache.org/jira/browse/FLINK-3920).  


import breeze.linalg.{Matrix => BreezeMatrix}  
import org.apache.flink.ml.math.Breeze._  
import org.apache.flink.ml.math.{Matrix, SparseMatrix}  

class Block(val blockData: Matrix) extends MatrixLayout with Serializable {  

def data: Matrix = blockData  

def toBreeze: BreezeMatrix[Double] = blockData.asBreeze  

def numRows: Int = data.numRows  

def numCols: Int = data.numCols  

def *(other: Block): Block = {  

require(this.numCols == other.numRows)  

Block((blockData.asBreeze * other.toBreeze).fromBreeze)  
}  

def +(other: Block): Block =  
Block((blockData.asBreeze + other.toBreeze).fromBreeze)  

def unary_+(other: Block): Block = this + other  

override def equals(other: Any): Boolean = {  
other match {  
case block: Block => this.blockData.equalsMatrix(block.blockData)  
case _ => false  
}  
}  

}  

The block matrix is a matrix of blocks, the implicated group reduce function  
it's the last step of the product function.  

class SumGroupOfBlocks(blockMapper: BlockMapper)  
extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)),  
(BlockID, Block)] {  

override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int,  
Int, Block))], out: Collector[(BlockID, Block)])  
: Unit = {  

val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {  
case ((i, j, left), (x, y, right)) => (i, y, left * right)  
}.toSeq  

val reducedGroup = multipliedGroup.reduce((left, right) => {  
val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)  

(i, j, leftBlock + rightBlock)  
})  

out.collect(blockMapper.blockIdFromCoo(reducedGroup._1,  
reducedGroup._2), reducedGroup._3)  
}  
}  

The above described exception happens when I try to increase the matrices  
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000  
matrices, but not with 2000x2000 matrices and above.  

I think it worths to mention also that the IndexOutOfBoundsException is  
always seeking for index 109 (on different matrices sizes) and the size of  
the Array is changing in a range (5-7). It looks like somehow the serialized  
message are truncated right before their delivery.  

I tried to follow several solutions, not in order what has not been worked:  

- employing flink-1.2.0, flink-1.3.0  
- updating flink kryo library to 3.0.3  
- running on parallelism 1  
- explicitly register my custom classes to Kryo  
- varying the size of my blocks  
- trying to increase akka.framesize  

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM.  
6GB task manager heap size.  
16384 numOfBuffers and 16384 networkBufferSize.  

If I run the code on my laptop on 2000x2000 matrices, it works, likely due  
to jumping off remote serialization.  

I really hope someone could help here. It's becoming really painful...  

Thank you so much.  

Cheers, Andrea  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to