Hi Andrea, I’ve rallied back to this and wanted to check on the status. Have you managed to solve this in the end, or is this still a problem for you?
If it’s still a problem, would you be able to provide a complete runnable example job that can reproduce the problem (ideally via a git branch I can clone and run :))? This would help me with digging a bit more into the issue. Thanks a lot! Best, Gordon On 8 June 2017 at 6:58:46 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote: Hi guys, thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0 versions. Following Gordon suggestion I tried to put setReference to false but sadly it didn't help. What I did then was to declare a custom serializer as the following: class BlockSerializer extends Serializer[Block] with Serializable { override def read(kryo: Kryo, input: Input, block: Class[Block]): Block = { val serializer = new SparseMatrixSerializer val blockData = kryo.readObject(input, classOf[SparseMatrix], serializer) new Block(blockData) } override def write(kryo: Kryo, output: Output, block: Block): Unit = { val serializer = new SparseMatrixSerializer kryo.register(classOf[SparseMatrix], serializer) kryo.writeObject(output, block.blockData, serializer) output.close() } } class SparseMatrixSerializer extends Serializer[SparseMatrix] with Serializable { override def read(kryo: Kryo, input: Input, sparse: Class[SparseMatrix]): SparseMatrix = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) val numRows = input.readInt val numCols = input.readInt val colPtrs = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val rowIndices = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val data = kryo.readObject(input, classOf[java.util.ArrayList[Double]], collectionDoubleSerializer).asScala.toArray input.close() new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs = colPtrs, rowIndices = rowIndices, data = data) } override def write(kryo: Kryo, output: Output, sparseMatrix: SparseMatrix): Unit = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) kryo.register(classOf[java.util.ArrayList[Int]], collectionIntSerializer) kryo.register(classOf[java.util.ArrayList[Double]], collectionDoubleSerializer) output.writeInt(sparseMatrix.numRows) output.writeInt(sparseMatrix.numCols) kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.data.toList.asJava, collectionDoubleSerializer) output.close() } } What I obtained is the same previous exception but on different accessed index and size. Caused by: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.set(ArrayList.java:444) at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680) at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85) at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) Does it might help somehow? Thank you again, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13596.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.