Hi Andrea,

the exception says that you don't have enough heap memory available to keep
a factors block in memory. You always have to create an object on the heap
when the user function is called.

You can try the following out to solve the problem.

1. Further decrease the taskmanager.memory.fraction: This will cause the
TaskManager to allocate less memory for managed memory and leaves more free
heap memory available
2. Decrease the number of slots on the TaskManager: This will decrease the
number of concurrently running user functions and thus the number of
objects which have to be kept on the heap.
3. Increase the number of ALS blocks `als.setBlocks(numberBlocks)`. This
will increase the number of blocks into which the factor matrices are split
up. A larger number means that each individual block is smaller and thus
will need fewer memory to be kept on the heap.

I hope this helps you to solve the problem.

Cheers,
Till

On Wed, Sep 7, 2016 at 11:57 AM, ANDREA SPINA <74...@studenti.unimore.it>
wrote:

> Ok, I'm still struggling with ALS. Now I'm running with a dataset of 2M
> users, 250K items, 700 rates per users (1,4B ratings). 50 latent factors,
> 400 numOfBlocks, 400 DOP.
>
>  Somehow I got the error, from the JM log I catch the previous mentioned
> exception:
>
> 09/06/2016 19:30:18     CoGroup (CoGroup at org.apache.flink.ml.
> recommendation.ALS$.updateFactors(ALS.scala:572))(62/400) switched to
> FAILED
> java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at
> org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))' ,
> caused an error: Error obtaining the sorted input: Thread 'SortMerger
> Reading Thread' terminated due to an exception: Fatal error at remote task
> manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:456)
>         at org.apache.flink.runtime.iterative.task.
> AbstractIterativeTask.run(AbstractIterativeTask.java:145)
>         at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal
> error at remote task manager 'cloud-20.dima.tu-berlin.de/
> 130.149.21.24:6121'.
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1079)
>         at org.apache.flink.runtime.operators.CoGroupDriver.
> prepare(CoGroupDriver.java:97)
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:450)
>         ... 5 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Fatal error at remote task manager '
> cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/
> 130.149.21.24:6121'.
>
> Then, the respective TM log
>
> java.lang.OutOfMemoryError: Java heap space
> 2016-09-06 19:30:17,958 ERROR org.apache.flink.runtime.taskmanager.TaskManager
>              - Could not update input data location for task CoGrou
> p (CoGroup at 
> org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572)).
> Trying to fail  task.
> java.lang.IllegalStateException: There has been an error in the channel.
>         at org.apache.flink.shaded.com.google.common.base.
> Preconditions.checkState(Preconditions.java:173)
>         at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClientHandler.addInputChannel(
> PartitionRequestClientHandler.java:78)
>         at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:
> 104)
>         at org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:117)
>         at org.apache.flink.runtime.io.network.partition.consumer.
> SingleInputGate.updateInputChannel(SingleInputGate.java:284)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$3$$anonfun$apply$1.apply$mcV$sp(TaskManager.scala:1076)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> So each node has 32G memory, I'm working with
> taskmanager.heap.mb = 28672
>
> And I tried with different memory fractions
> taskmanager.memory.fraction = (0.5, 0.6, 0.8)
>
> Hope you have enough info now.
> Thank you for your help.
>
> Andrea
>
> 2016-09-02 11:30 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:
>
>> Hi Stefan,
>> Thank you so much for the answer. Ok, I'll do it asap.
>> For the sake of argument, could the issue be related to the low number of
>> blocks? I noticed the Flink implementation, as default, set the number of
>> blocks to the input count (which is actually a lot). So with a low
>> cardinality and big sized blocks, maybe they don't fit somewhere...
>> Thank you again.
>>
>> Andrea
>>
>> 2016-09-02 10:51 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>:
>>
>>> Hi,
>>>
>>> unfortunately, the log does not contain the required information for
>>> this case. It seems like a sender to the SortMerger failed. The best way to
>>> find this problem is to take a look to the exceptions that are reported in
>>> the web front-end for the failing job. Could you check if you find any
>>> reported exceptions there and provide them to us?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>>>
>>> Sure. Here
>>> <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c> you can
>>> find the complete logs file.
>>> Still can not run through the issue. Thank you for your help.
>>>
>>> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> I don't know whether my usual error is related to this one but is very
>>>> similar and it happens randomly...I still have to figure out the root cause
>>>> of the error:
>>>>
>>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>>>> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map
>>>> (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the
>>>> sorted input: Thread 'SortMerger spilling thread' terminated due to an
>>>> exception: -2
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:345)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>>>> -2
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1079)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:94)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: -2
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>>> at java.util.ArrayList.get(ArrayList.java:431)
>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO
>>>> bject(MapReferenceResolver.java:42)
>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(Map
>>>> Serializer.java:135)
>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(Map
>>>> Serializer.java:21)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>>> zer.deserialize(KryoSerializer.java:219)
>>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>>> zer.deserialize(KryoSerializer.java:245)
>>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>>> zer.copy(KryoSerializer.java:255)
>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.c
>>>> opy(PojoSerializer.java:556)
>>>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>>>> ase.copy(TupleSerializerBase.java:75)
>>>> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
>>>> writeToOutput(NormalizedKeySorter.java:499)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $SpillingThread.go(UnilateralSortMerger.java:1344)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>>
>>>> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
>>>> s.rich...@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> could you provide the log outputs for your job (ideally with debug
>>>>> logging enabled)?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it
>>>>> >:
>>>>>
>>>>> Hi everyone.
>>>>> I'm running the FlinkML ALS matrix factorization and I bumped into the
>>>>> following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at org.apache.flink.client.program.ContextEnvironment.execute(C
>>>>> ontextEnvironment.java:60)
>>>>> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec
>>>>> utionEnvironment.scala:652)
>>>>> at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTool
>>>>> s.scala:94)
>>>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
>>>>> at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.s
>>>>> cala:55)
>>>>> at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
>>>>> at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
>>>>> at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>> ssorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>> thodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>> od(PackagedProgram.java:505)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>> liFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>> end.java:1192)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>>>>> dTree1$1(Future.scala:24)
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>>>>> uture.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>>>>> exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>>>>> ava:260)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
>>>>> All(ForkJoinPool.java:1253)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>>>> kJoinPool.java:1346)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>> l.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>> orkerThread.java:107)
>>>>> Caused by: java.lang.RuntimeException: Initializing the input
>>>>> processing failed: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: null
>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:325)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1079)
>>>>> at org.apache.flink.runtime.operators.BatchTask.initLocalStrate
>>>>> gies(BatchTask.java:819)
>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:321)
>>>>> ... 2 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>>> Caused by: org.apache.flink.runtime.io.ne
>>>>> twork.partition.ProducerFailedException
>>>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>>>> InputChannel.getNextLookAhead(LocalInputChannel.java:270)
>>>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>>>> InputChannel.onNotification(LocalInputChannel.java:238)
>>>>> at org.apache.flink.runtime.io.network.partition.PipelinedSubpa
>>>>> rtition.release(PipelinedSubpartition.java:158)
>>>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>>>> n.release(ResultPartition.java:320)
>>>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>>>> nManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
>>>>> at org.apache.flink.runtime.io.network.NetworkEnvironment.unreg
>>>>> isterTask(NetworkEnvironment.java:370)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I'm running with* flink-1.0.3*. I really can't figure out the reason
>>>>> behind that.
>>>>>
>>>>> My code simply calls the library as follows:
>>>>>
>>>>> val als = ALS()
>>>>>   .setIterations(numIterations)
>>>>>   .setNumFactors(rank)
>>>>>   .setBlocks(degreeOfParallelism)
>>>>>   .setSeed(42)
>>>>>   .setTemporaryPath(tempPath)
>>>>>
>>>>> als.fit(ratings, parameters)
>>>>>
>>>>> val (users, items) = als.factorsOption match {
>>>>>   case Some(_) => als.factorsOption.get
>>>>>   case _ => throw new RuntimeException
>>>>> }
>>>>>
>>>>> users.writeAsText(outputPath, WriteMode.OVERWRITE)
>>>>> items.writeAsText(outputPath, WriteMode.OVERWRITE)
>>>>>
>>>>> env.execute("ALS matrix factorization")
>>>>>
>>>>> where
>>>>> - ratings as the input dataset contains (uid, iid, rate) rows about
>>>>> 8e6 users, 1e6 items and 700 rating per user average.
>>>>> - numIterations 10
>>>>> - rank 50
>>>>> - degreeOfParallelism 240
>>>>>
>>>>>
>>>>> *The error seems to be related to the final .persists() call.*at
>>>>> org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>>>>
>>>>> I'm running with a 15 nodes cluster - 16cpus per node - with the
>>>>> following valuable properties:
>>>>>
>>>>> jobmanager.heap.mb = 2048
>>>>> taskmanager.memory.fraction = 0.5
>>>>> taskmanager.heap.mb = 28672
>>>>> taskmanager.network.bufferSizeInBytes = 32768
>>>>> taskmanager.network.numberOfBuffers = 98304
>>>>> akka.ask.timeout = 300s
>>>>>
>>>>> Any help will be appreciated. Thank you.
>>>>>
>>>>> --
>>>>> *Andrea Spina*
>>>>> N.Tessera: *74598*
>>>>> MAT: *89369*
>>>>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Andrea Spina*
>>> N.Tessera: *74598*
>>> MAT: *89369*
>>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>>
>>>
>>>
>>
>>
>> --
>> *Andrea Spina*
>> N.Tessera: *74598*
>> MAT: *89369*
>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>
>
>
>
> --
> *Andrea Spina*
> N.Tessera: *74598*
> MAT: *89369*
> *Ingegneria Informatica* *[LM] *(D.M. 270)
>

Reply via email to