Alright I will try to do that. I've tried running the job with a CSV file as input, and using DenseVectors to represent the features, still the same IndexOutOfBounds error.
On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <trohrm...@apache.org> wrote: > You could change the version of Stephan’s branch via mvn versions:set > -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after > you install the Flink binaries you can reference them in your project by > setting the version of your Flink dependencies to MyCustomBuildVersion. > That way, you are sure that the right dependencies are used. > > Alternatively, you could compile an example program with example input > data which can reproduce the problem. Then I could also take a look at it. > > Cheers, > Till > > > On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > >> OK here's what I tried: >> >> * Build Flink (mvn clean install) from the branch you linked (kryo) >> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, >> added local maven repo to resolvers so that it picks up the previously >> installed version (I hope) >> * Launch local cluster from newly built Flink, try to run job >> >> Still getting the same error. >> >> Is there a way to ensure that SBT is picking up the local version of >> Flink to build the uber-jar? >> Does it matter in this case, or is it enough that I'm sure the launched >> Flink instance comes from the branch you linked? >> >> >> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> The bug looks to be in the serialization via Kryo while spilling >>> windows. Note that Kryo is here used as a fallback serializer, since the >>> SparseVector is not transparent type to Flink. >>> >>> I think there are two possible reasons: >>> 1) Kryo, or our Kryo setup has an issue here >>> 2) Kryo is inconsistently configured. There are multiple Kryo >>> instances used across the serializers in the sorter. There may be a bug >>> that they are not initialized in sync. >>> >>> >>> To check this, can you build Flink with this pull request ( >>> https://github.com/apache/flink/pull/1528) or from this branch ( >>> https://github.com/StephanEwen/incubator-flink kryo) and see if that >>> fixes it? >>> >>> >>> Thanks, >>> Stephan >>> >>> >>> >>> >>> >>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis < >>> theodoros.vasilou...@gmail.com> wrote: >>> >>>> I haven't been able to reproduce this with other datasets. Taking a >>>> smaller sample from the large dataset I'm using (link to data >>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>) >>>> causes the same problem however. >>>> >>>> I'm wondering if the implementation of readLibSVM is what's wrong here. >>>> I've tried the new version commited recently by Chiwan, but I still get the >>>> same error. >>>> >>>> I'll see if I can spot a bug in readLibSVM. >>>> >>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis < >>>> theodoros.vasilou...@gmail.com> wrote: >>>> >>>>> It's on 0.10. >>>>> >>>>> I've tried explicitly registering SparseVector (which is done anyway >>>>> by registerFlinkMLTypes >>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49> >>>>> which is called when the SVM predict or evaluate functions are called >>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>) >>>>> in my job but I still get the same. I will try a couple different datasets >>>>> and try to see if it's the number of features that is causing this or >>>>> something else. >>>>> >>>>> So far it works fine for a dataset with 8 features, but the large one >>>>> has 2000 and I get the above error there. I will try large datasets with a >>>>> few features and small datasets with many features as well. >>>>> >>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT? >>>>>> >>>>>> It is probably an incorrectly configured Kryo instance (not a problem >>>>>> of the sorter). >>>>>> What is strange is that it occurs in the "MapReferenceResolver" - >>>>>> there should be no reference resolution during serialization / >>>>>> deserialization. >>>>>> >>>>>> Can you try what happens when you explicitly register the type >>>>>> SparseVector at the ExecutionEnvironment? >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis < >>>>>> theodoros.vasilou...@gmail.com> wrote: >>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> I'm trying to run a job using FlinkML and I'm confused about the >>>>>>> source of an error. >>>>>>> >>>>>>> The job reads a libSVM formatted file and trains an SVM classifier >>>>>>> on it. >>>>>>> >>>>>>> I've tried this with small datasets and everything works out fine. >>>>>>> >>>>>>> When trying to run the same job on a large dataset (~11GB >>>>>>> uncompressed) however, I get the following error: >>>>>>> >>>>>>> >>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input: >>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception: >>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>> Serialization trace: >>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: >>>>>>>> Index: >>>>>>>> 14, Size: 2 >>>>>>>> Serialization trace: >>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException: >>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>> Serialization trace: >>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >>>>>>>> at java.util.ArrayList.set(ArrayList.java:444) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) >>>>>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) >>>>>>>> ... 10 more >>>>>>> >>>>>>> >>>>>>> >>>>>>> Any idea what might be causing this? I'm running the job in local >>>>>>> mode, 1 TM with 8 slots and ~32GB heap size. >>>>>>> >>>>>>> All the vectors created by the libSVM loader have the correct size. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >