And this is the one from running with a CSV input, this time I've verified that I'm using the correct version of Flink, according to Till's instructions:
The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:315) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) > at fosdem.SVMClassification$.main(SVMClassification.scala:128) > at fosdem.SVMClassification.main(SVMClassification.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: > scala.collection.immutable.Map$EmptyMap$ cannot be cast to > org.apache.flink.ml.math.Vector > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085) > at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: scala.collection.immutable.Map$EmptyMap$ > cannot be cast to org.apache.flink.ml.math.Vector > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: java.lang.ClassCastException: > scala.collection.immutable.Map$EmptyMap$ cannot be cast to > org.apache.flink.ml.math.Vector > at > org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353) > at > org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:111) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:104) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > On Thu, Jan 21, 2016 at 10:51 AM, Theodore Vasiloudis < theodoros.vasilou...@gmail.com> wrote: > This is the stack trace from running with the patched branch: > > The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >> at >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803) >> at >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591) >> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) >> at fosdem.SVMClassification$.main(SVMClassification.scala:114) >> at fosdem.SVMClassification.main(SVMClassification.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger spilling thread' terminated due to an exception: >> java.lang.ArrayIndexOutOfBoundsException >> Serialization trace: >> indices (org.apache.flink.ml.math.SparseVector) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085) >> at >> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException >> Serialization trace: >> indices (org.apache.flink.ml.math.SparseVector) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: com.esotericsoftware.kryo.KryoException: >> java.lang.ArrayIndexOutOfBoundsException >> Serialization trace: >> indices (org.apache.flink.ml.math.SparseVector) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >> at >> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> Caused by: java.lang.ArrayIndexOutOfBoundsException >> at >> org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128) >> at >> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195) >> at >> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) >> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) >> at com.esotericsoftware.kryo.io.Output.require(Output.java:142) >> at >> com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266) >> at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251) >> at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52) >> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) >> ... 9 more >> > > On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <se...@apache.org> wrote: > >> Can you again post the stack trace? With the patched branch, the >> reference mapper should not be used any more (which is where the original >> exception occurred). >> >> On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis < >> theodoros.vasilou...@gmail.com> wrote: >> >>> Alright I will try to do that. >>> >>> I've tried running the job with a CSV file as input, and using >>> DenseVectors to represent the features, still the same IndexOutOfBounds >>> error. >>> >>> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> You could change the version of Stephan’s branch via mvn versions:set >>>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now >>>> after you install the Flink binaries you can reference them in your project >>>> by setting the version of your Flink dependencies to >>>> MyCustomBuildVersion. That way, you are sure that the right >>>> dependencies are used. >>>> >>>> Alternatively, you could compile an example program with example input >>>> data which can reproduce the problem. Then I could also take a look at it. >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis < >>>> theodoros.vasilou...@gmail.com> wrote: >>>> >>>>> OK here's what I tried: >>>>> >>>>> * Build Flink (mvn clean install) from the branch you linked (kryo) >>>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version, >>>>> added local maven repo to resolvers so that it picks up the previously >>>>> installed version (I hope) >>>>> * Launch local cluster from newly built Flink, try to run job >>>>> >>>>> Still getting the same error. >>>>> >>>>> Is there a way to ensure that SBT is picking up the local version of >>>>> Flink to build the uber-jar? >>>>> Does it matter in this case, or is it enough that I'm sure the >>>>> launched Flink instance comes from the branch you linked? >>>>> >>>>> >>>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> The bug looks to be in the serialization via Kryo while spilling >>>>>> windows. Note that Kryo is here used as a fallback serializer, since the >>>>>> SparseVector is not transparent type to Flink. >>>>>> >>>>>> I think there are two possible reasons: >>>>>> 1) Kryo, or our Kryo setup has an issue here >>>>>> 2) Kryo is inconsistently configured. There are multiple Kryo >>>>>> instances used across the serializers in the sorter. There may be a bug >>>>>> that they are not initialized in sync. >>>>>> >>>>>> >>>>>> To check this, can you build Flink with this pull request ( >>>>>> https://github.com/apache/flink/pull/1528) or from this branch ( >>>>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that >>>>>> fixes it? >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis < >>>>>> theodoros.vasilou...@gmail.com> wrote: >>>>>> >>>>>>> I haven't been able to reproduce this with other datasets. Taking a >>>>>>> smaller sample from the large dataset I'm using (link to data >>>>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>) >>>>>>> causes the same problem however. >>>>>>> >>>>>>> I'm wondering if the implementation of readLibSVM is what's wrong >>>>>>> here. I've tried the new version commited recently by Chiwan, but I >>>>>>> still >>>>>>> get the same error. >>>>>>> >>>>>>> I'll see if I can spot a bug in readLibSVM. >>>>>>> >>>>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis < >>>>>>> theodoros.vasilou...@gmail.com> wrote: >>>>>>> >>>>>>>> It's on 0.10. >>>>>>>> >>>>>>>> I've tried explicitly registering SparseVector (which is done >>>>>>>> anyway by registerFlinkMLTypes >>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49> >>>>>>>> which is called when the SVM predict or evaluate functions are >>>>>>>> called >>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>) >>>>>>>> in my job but I still get the same. I will try a couple different >>>>>>>> datasets >>>>>>>> and try to see if it's the number of features that is causing this or >>>>>>>> something else. >>>>>>>> >>>>>>>> So far it works fine for a dataset with 8 features, but the large >>>>>>>> one has 2000 and I get the above error there. I will try large datasets >>>>>>>> with a few features and small datasets with many features as well. >>>>>>>> >>>>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi! >>>>>>>>> >>>>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT? >>>>>>>>> >>>>>>>>> It is probably an incorrectly configured Kryo instance (not a >>>>>>>>> problem of the sorter). >>>>>>>>> What is strange is that it occurs in the "MapReferenceResolver" - >>>>>>>>> there should be no reference resolution during serialization / >>>>>>>>> deserialization. >>>>>>>>> >>>>>>>>> Can you try what happens when you explicitly register the type >>>>>>>>> SparseVector at the ExecutionEnvironment? >>>>>>>>> >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis < >>>>>>>>> theodoros.vasilou...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the >>>>>>>>>> source of an error. >>>>>>>>>> >>>>>>>>>> The job reads a libSVM formatted file and trains an SVM >>>>>>>>>> classifier on it. >>>>>>>>>> >>>>>>>>>> I've tried this with small datasets and everything works out fine. >>>>>>>>>> >>>>>>>>>> When trying to run the same job on a large dataset (~11GB >>>>>>>>>> uncompressed) however, I get the following error: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input: >>>>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception: >>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>>>>> Serialization trace: >>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling >>>>>>>>>>> thread' terminated due to an exception: >>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>>>>> Serialization trace: >>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException: >>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2 >>>>>>>>>>> Serialization trace: >>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: >>>>>>>>>>> 2 >>>>>>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >>>>>>>>>>> at java.util.ArrayList.set(ArrayList.java:444) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) >>>>>>>>>>> ... 10 more >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Any idea what might be causing this? I'm running the job in local >>>>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size. >>>>>>>>>> >>>>>>>>>> All the vectors created by the libSVM loader have the correct >>>>>>>>>> size. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >