Ok, I'm still struggling with ALS. Now I'm running with a dataset of 2M
users, 250K items, 700 rates per users (1,4B ratings). 50 latent factors,
400 numOfBlocks, 400 DOP.

 Somehow I got the error, from the JM log I catch the previous mentioned
exception:

09/06/2016 19:30:18     CoGroup (CoGroup at
org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))(62/400)
switched to FAILED
java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at
org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))' ,
caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Fatal error at remote task
manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145)
        at
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal
error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121
'.
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 5 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Fatal error at remote task manager '
cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Fatal error at remote task manager '
cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.

Then, the respective TM log

java.lang.OutOfMemoryError: Java heap space
2016-09-06 19:30:17,958 ERROR
org.apache.flink.runtime.taskmanager.TaskManager              - Could not
update input data location for task CoGrou
p (CoGroup at
org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572)).
Trying to fail  task.
java.lang.IllegalStateException: There has been an error in the channel.
        at
org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
        at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:78)
        at
org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
        at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:117)
        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.updateInputChannel(SingleInputGate.java:284)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(TaskManager.scala:1076)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So each node has 32G memory, I'm working with
taskmanager.heap.mb = 28672

And I tried with different memory fractions
taskmanager.memory.fraction = (0.5, 0.6, 0.8)

Hope you have enough info now.
Thank you for your help.

Andrea

2016-09-02 11:30 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:

> Hi Stefan,
> Thank you so much for the answer. Ok, I'll do it asap.
> For the sake of argument, could the issue be related to the low number of
> blocks? I noticed the Flink implementation, as default, set the number of
> blocks to the input count (which is actually a lot). So with a low
> cardinality and big sized blocks, maybe they don't fit somewhere...
> Thank you again.
>
> Andrea
>
> 2016-09-02 10:51 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> unfortunately, the log does not contain the required information for this
>> case. It seems like a sender to the SortMerger failed. The best way to find
>> this problem is to take a look to the exceptions that are reported in the
>> web front-end for the failing job. Could you check if you find any reported
>> exceptions there and provide them to us?
>>
>> Best,
>> Stefan
>>
>> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>>
>> Sure. Here
>> <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c> you can
>> find the complete logs file.
>> Still can not run through the issue. Thank you for your help.
>>
>> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> I don't know whether my usual error is related to this one but is very
>>> similar and it happens randomly...I still have to figure out the root cause
>>> of the error:
>>>
>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>>> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map
>>> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted
>>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>>> -2
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:345)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1079)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:94)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>> terminated due to an exception: -2
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO
>>> bject(MapReferenceResolver.java:42)
>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(Map
>>> Serializer.java:135)
>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(Map
>>> Serializer.java:21)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.deserialize(KryoSerializer.java:219)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.deserialize(KryoSerializer.java:245)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.copy(KryoSerializer.java:255)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.c
>>> opy(PojoSerializer.java:556)
>>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>>> ase.copy(TupleSerializerBase.java:75)
>>> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
>>> writeToOutput(NormalizedKeySorter.java:499)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $SpillingThread.go(UnilateralSortMerger.java:1344)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>>
>>> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> could you provide the log outputs for your job (ideally with debug
>>>> logging enabled)?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it
>>>> >:
>>>>
>>>> Hi everyone.
>>>> I'm running the FlinkML ALS matrix factorization and I bumped into the
>>>> following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>> program execution failed: Job execution failed.
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>> at org.apache.flink.client.program.ContextEnvironment.execute(C
>>>> ontextEnvironment.java:60)
>>>> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec
>>>> utionEnvironment.scala:652)
>>>> at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTool
>>>> s.scala:94)
>>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
>>>> at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
>>>> at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
>>>> at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
>>>> at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>> od(PackagedProgram.java:505)
>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>> liFrontend.java:866)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>> end.java:1192)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>>>> dTree1$1(Future.scala:24)
>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>>>> uture.scala:24)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>>>> exec(AbstractDispatcher.scala:401)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
>>>> All(ForkJoinPool.java:1253)
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>>> kJoinPool.java:1346)
>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>> l.java:1979)
>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>> orkerThread.java:107)
>>>> Caused by: java.lang.RuntimeException: Initializing the input
>>>> processing failed: Error obtaining the sorted input: Thread 'SortMerger
>>>> Reading Thread' terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:325)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1079)
>>>> at org.apache.flink.runtime.operators.BatchTask.initLocalStrate
>>>> gies(BatchTask.java:819)
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:321)
>>>> ... 2 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailed
>>>> Exception
>>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>>> InputChannel.getNextLookAhead(LocalInputChannel.java:270)
>>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>>> InputChannel.onNotification(LocalInputChannel.java:238)
>>>> at org.apache.flink.runtime.io.network.partition.PipelinedSubpa
>>>> rtition.release(PipelinedSubpartition.java:158)
>>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>>> n.release(ResultPartition.java:320)
>>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>>> nManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
>>>> at org.apache.flink.runtime.io.network.NetworkEnvironment.unreg
>>>> isterTask(NetworkEnvironment.java:370)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm running with* flink-1.0.3*. I really can't figure out the reason
>>>> behind that.
>>>>
>>>> My code simply calls the library as follows:
>>>>
>>>> val als = ALS()
>>>>   .setIterations(numIterations)
>>>>   .setNumFactors(rank)
>>>>   .setBlocks(degreeOfParallelism)
>>>>   .setSeed(42)
>>>>   .setTemporaryPath(tempPath)
>>>>
>>>> als.fit(ratings, parameters)
>>>>
>>>> val (users, items) = als.factorsOption match {
>>>>   case Some(_) => als.factorsOption.get
>>>>   case _ => throw new RuntimeException
>>>> }
>>>>
>>>> users.writeAsText(outputPath, WriteMode.OVERWRITE)
>>>> items.writeAsText(outputPath, WriteMode.OVERWRITE)
>>>>
>>>> env.execute("ALS matrix factorization")
>>>>
>>>> where
>>>> - ratings as the input dataset contains (uid, iid, rate) rows about 8e6
>>>> users, 1e6 items and 700 rating per user average.
>>>> - numIterations 10
>>>> - rank 50
>>>> - degreeOfParallelism 240
>>>>
>>>>
>>>> *The error seems to be related to the final .persists() call.*at
>>>> org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>>>
>>>> I'm running with a 15 nodes cluster - 16cpus per node - with the
>>>> following valuable properties:
>>>>
>>>> jobmanager.heap.mb = 2048
>>>> taskmanager.memory.fraction = 0.5
>>>> taskmanager.heap.mb = 28672
>>>> taskmanager.network.bufferSizeInBytes = 32768
>>>> taskmanager.network.numberOfBuffers = 98304
>>>> akka.ask.timeout = 300s
>>>>
>>>> Any help will be appreciated. Thank you.
>>>>
>>>> --
>>>> *Andrea Spina*
>>>> N.Tessera: *74598*
>>>> MAT: *89369*
>>>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> *Andrea Spina*
>> N.Tessera: *74598*
>> MAT: *89369*
>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>
>>
>>
>
>
> --
> *Andrea Spina*
> N.Tessera: *74598*
> MAT: *89369*
> *Ingegneria Informatica* *[LM] *(D.M. 270)
>



-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)

Reply via email to