And this is the one from running with a CSV input, this time I've verified
that I'm using the correct version of Flink, according to Till's
instructions:

 The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>         at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>         at fosdem.SVMClassification$.main(SVMClassification.scala:128)
>         at fosdem.SVMClassification.main(SVMClassification.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> scala.collection.immutable.Map$EmptyMap$ cannot be cast to
> org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: scala.collection.immutable.Map$EmptyMap$
> cannot be cast to org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.lang.ClassCastException:
> scala.collection.immutable.Map$EmptyMap$ cannot be cast to
> org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
>         at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:111)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:104)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
>         at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>

On Thu, Jan 21, 2016 at 10:51 AM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> This is the stack trace from running with the patched branch:
>
>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>         at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>         at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>>         at
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>>         at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>>         at fosdem.SVMClassification$.main(SVMClassification.scala:114)
>>         at fosdem.SVMClassification.main(SVMClassification.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>         at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>         at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
>>         at
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>         at
>> org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>         at
>> com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
>>         at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
>>         at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
>>         at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>>         ... 9 more
>>
>
> On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Can you again post the stack trace? With the patched branch, the
>> reference mapper should not be used any more (which is where the original
>> exception occurred).
>>
>> On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <
>> theodoros.vasilou...@gmail.com> wrote:
>>
>>> Alright I will try to do that.
>>>
>>> I've tried running the job with a CSV file as input, and using
>>> DenseVectors to represent the features, still the same IndexOutOfBounds
>>> error.
>>>
>>> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> You could change the version of Stephan’s branch via mvn versions:set
>>>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now
>>>> after you install the Flink binaries you can reference them in your project
>>>> by setting the version of your Flink dependencies to
>>>> MyCustomBuildVersion. That way, you are sure that the right
>>>> dependencies are used.
>>>>
>>>> Alternatively, you could compile an example program with example input
>>>> data which can reproduce the problem. Then I could also take a look at it.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>
>>>>> OK here's what I tried:
>>>>>
>>>>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>>>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>>>>> added local maven repo to resolvers so that it picks up the previously
>>>>> installed version (I hope)
>>>>> * Launch local cluster from newly built Flink, try to run job
>>>>>
>>>>> Still getting the same error.
>>>>>
>>>>> Is there a way to ensure that SBT is picking up the local version of
>>>>> Flink to build the uber-jar?
>>>>> Does it matter in this case, or is it enough that I'm sure the
>>>>> launched Flink instance comes from the branch you linked?
>>>>>
>>>>>
>>>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The bug looks to be in the serialization via Kryo while spilling
>>>>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>>>>> SparseVector is not transparent type to Flink.
>>>>>>
>>>>>> I think there are two possible reasons:
>>>>>>   1) Kryo, or our Kryo setup has an issue here
>>>>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>>>>> instances used across the serializers in the sorter. There may be a bug
>>>>>> that they are not initialized in sync.
>>>>>>
>>>>>>
>>>>>> To check this, can you build Flink with this pull request (
>>>>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>>>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>>>>> fixes it?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>>
>>>>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>>>>> smaller sample from the large dataset I'm using (link to data
>>>>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>>>>> causes the same problem however.
>>>>>>>
>>>>>>> I'm wondering if the implementation of readLibSVM is what's wrong
>>>>>>> here. I've tried the new version commited recently by Chiwan, but I 
>>>>>>> still
>>>>>>> get the same error.
>>>>>>>
>>>>>>> I'll see if I can spot a bug in readLibSVM.
>>>>>>>
>>>>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>>>
>>>>>>>> It's on 0.10.
>>>>>>>>
>>>>>>>> I've tried explicitly registering SparseVector (which is done
>>>>>>>> anyway by registerFlinkMLTypes
>>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>>>>> which is called when the SVM predict or evaluate functions are
>>>>>>>> called
>>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>>>>> in my job but I still get the same. I will try a couple different 
>>>>>>>> datasets
>>>>>>>> and try to see if it's the number of features that is causing this or
>>>>>>>> something else.
>>>>>>>>
>>>>>>>> So far it works fine for a dataset with 8 features, but the large
>>>>>>>> one has 2000 and I get the above error there. I will try large datasets
>>>>>>>> with a few features and small datasets with many features as well.
>>>>>>>>
>>>>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>>>>>
>>>>>>>>> It is probably an incorrectly configured Kryo instance (not a
>>>>>>>>> problem of the sorter).
>>>>>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>>>>>> there should be no reference resolution during serialization /
>>>>>>>>> deserialization.
>>>>>>>>>
>>>>>>>>> Can you try what happens when you explicitly register the type
>>>>>>>>> SparseVector at the ExecutionEnvironment?
>>>>>>>>>
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>>>>>> source of an error.
>>>>>>>>>>
>>>>>>>>>> The job reads a libSVM formatted file and trains an SVM
>>>>>>>>>> classifier on it.
>>>>>>>>>>
>>>>>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>>>>>
>>>>>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>>>>>> uncompressed) however, I get the following error:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling
>>>>>>>>>>> thread' terminated due to an exception:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size:
>>>>>>>>>>> 2
>>>>>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>>>>>         ... 10 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>>>>>
>>>>>>>>>> All the vectors created by the libSVM loader have the correct
>>>>>>>>>> size.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to