The bug looks to be in the serialization via Kryo while spilling windows. Note that Kryo is here used as a fallback serializer, since the SparseVector is not transparent type to Flink.
I think there are two possible reasons: 1) Kryo, or our Kryo setup has an issue here 2) Kryo is inconsistently configured. There are multiple Kryo instances used across the serializers in the sorter. There may be a bug that they are not initialized in sync. To check this, can you build Flink with this pull request ( https://github.com/apache/flink/pull/1528) or from this branch ( https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes it? Thanks, Stephan On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis < theodoros.vasilou...@gmail.com> wrote: > I haven't been able to reproduce this with other datasets. Taking a > smaller sample from the large dataset I'm using (link to data > <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>) > causes the same problem however. > > I'm wondering if the implementation of readLibSVM is what's wrong here. > I've tried the new version commited recently by Chiwan, but I still get the > same error. > > I'll see if I can spot a bug in readLibSVM. > > On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > >> It's on 0.10. >> >> I've tried explicitly registering SparseVector (which is done anyway by >> registerFlinkMLTypes >> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49> >> which is called when the SVM predict or evaluate functions are called >> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>) >> in my job but I still get the same. I will try a couple different datasets >> and try to see if it's the number of features that is causing this or >> something else. >> >> So far it works fine for a dataset with 8 features, but the large one has >> 2000 and I get the above error there. I will try large datasets with a few >> features and small datasets with many features as well. >> >> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> Does this error occur in 0.10 or im 1.0-SNAPSHOT? >>> >>> It is probably an incorrectly configured Kryo instance (not a problem of >>> the sorter). >>> What is strange is that it occurs in the "MapReferenceResolver" - there >>> should be no reference resolution during serialization / deserialization. >>> >>> Can you try what happens when you explicitly register the type >>> SparseVector at the ExecutionEnvironment? >>> >>> Stephan >>> >>> >>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis < >>> theodoros.vasilou...@gmail.com> wrote: >>> >>>> Hello all, >>>> >>>> I'm trying to run a job using FlinkML and I'm confused about the source >>>> of an error. >>>> >>>> The job reads a libSVM formatted file and trains an SVM classifier on >>>> it. >>>> >>>> I've tried this with small datasets and everything works out fine. >>>> >>>> When trying to run the same job on a large dataset (~11GB uncompressed) >>>> however, I get the following error: >>>> >>>> >>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread >>>>> 'SortMerger spilling thread' terminated due to an exception: >>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>> Serialization trace: >>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089) >>>>> at >>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: >>>>> Index: >>>>> 14, Size: 2 >>>>> Serialization trace: >>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>> Caused by: com.esotericsoftware.kryo.KryoException: >>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>> Serialization trace: >>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >>>>> at java.util.ArrayList.set(ArrayList.java:444) >>>>> at >>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) >>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) >>>>> ... 10 more >>>> >>>> >>>> >>>> Any idea what might be causing this? I'm running the job in local mode, >>>> 1 TM with 8 slots and ~32GB heap size. >>>> >>>> All the vectors created by the libSVM loader have the correct size. >>>> >>> >>> >> >