Hi Yi,

The problem here, as Stephan already suggested, is that you have a very
large job. Each complex operation (join, coGroup, etc) needs a
share of memory.
In Flink, for the test cases at least, they restrict the TaskManagers'
memory to just 80MB in order to run multiple tests in parallel on Travis.
If you chain lots of operators, you could easily exceed that threshold.

The only way this test case would work is if you would split it somehow.
Problem is that for Affinity Propagation, one (myself included) would like
to test the whole algorithm at once. So maybe a quick fix would be to
increase the amount of memory for the TMs.

An almost-identical discussion could be found here:
https://www.mail-archive.com/dev@flink.apache.org/msg01631.html

Andra

On Thu, May 14, 2015 at 12:35 AM, Stephan Ewen <se...@apache.org> wrote:

> You are probably starting the system with very little memory, or you have
> an immensely large job.
>
> Have a look here, I think this discussion on the user mailing list a few
> days ago is about the same issue:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-exception-td1206.html
>
> On Thu, May 14, 2015 at 12:22 AM, Yi ZHOU <zhouyi0...@hotmail.com> wrote:
>
> > Hello ,
> >
> > Thank @Stephan for the explanations. Though I with these information, I
> > still have no clue to trace the error.
> >
> > Now, the  exception stack  in the *cluster mode* always looks like this
> > (even I set env.setParallelism(1)):
> >
> > org.apache.flink.runtime.client.JobExecutionException: Job execution
> > failed.
> >     at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> >     at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> >     at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> >     at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> >     at
> >
> org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:160)
> >     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> >     at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> >     at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> >     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> >     at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >     at
> >
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:95)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >     at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.Exception: The data preparation for task 'Join (Join
> > at groupReduceOnNeighbors(Graph.java:1212))
> > (d2338ea96e86b505867b3cf3bffec007)' , caused an error: Too few memory
> > segments provided. Hash Join needs at least 33 memory segments.
> >     at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:469)
> >     at
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
> >     at
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223)
> >     at java.lang.Thread.run(Thread.java:701)
> > Caused by: java.lang.IllegalArgumentException: Too few memory segments
> > provided. Hash Join needs at least 33 memory segments.
> >     at
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:373)
> >     at
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:359)
> >     at
> >
> org.apache.flink.runtime.operators.hash.HashMatchIteratorBase.getHashJoin(HashMatchIteratorBase.java:48)
> >     at
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.<init>(NonReusingBuildSecondHashMatchIterator.java:77)
> >     at
> >
> org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:151)
> >     at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:464)
> >     ... 3 more
> >
> >
> > It looks that  the memory is in need when we do "Join at
> > groupReduceOnNeighbors(Graph.java:1212)", however, none of the lines is
> > directed related with my code.  I don't know where i should pay attention
> > to adapt the cluster mode.
> > I write the data transformations as told in the doc and examples(Data
> > transformation and Gelly). Any one know the cause of this kind of error?
> >
> > Here is a link to my test code.
> >
> >
> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java
> >
> >
> >
> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropogation.java
> >
> > Thanks
> >
> >
> > ZHOU Yi
> >
> > On 13/05/2015 01:04, Stephan Ewen wrote:
> >
> >> Hi!
> >>
> >> The *collection execution* runs the program simply as functions over
> Java
> >> collections. It is single threaded, always local, and does not use any
> >> Flink memory management, serialization, or so. It is designed to be very
> >> lightweight and is tailored towards very small problems.
> >>
> >> The *cluster mode* is the regular Flink mode. It spawns a Flink cluster
> >>
> >> with one worker and multiple slots. It runs programs parallel, uses
> >> managed
> >> memory, and should behave pretty much like the regular Flink
> installation
> >> (with one worker and little memory).
> >>
> >> To debug your test, I would first see whether it is parallelism
> sensitive.
> >> The cluster mode uses parallelism 4 by default, the collection execution
> >> is
> >> single threaded (parallelism 1). You can force the parallelism to be
> >> always
> >> one by setting it on the execution environment.
> >>
> >> Stephan
> >>
> >>
> >>
> >>
> >> On Wed, May 13, 2015 at 12:44 AM, Yi ZHOU <zhouyi0...@hotmail.com>
> wrote:
> >>
> >>  Hello,
> >>>
> >>> Thanks Andra for the gaussian sequence generation. It is a little
> tricky,
> >>> i just leave this part for future work.
> >>>
> >>> I meet another problem in AffinityPropogation algorithm. I write a few
> >>> test code for it.
> >>>
> >>>
> >>>
> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java
> >>> <
> >>>
> >>>
> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java
> >>> >It
> >>> passes the COLLECTION while failed when execution mode = CLUSTER.
> >>> I not very clear about the differences and the reason.
> >>>
> >>> Does anyone give me a clue?
> >>>
> >>> Thanks,
> >>> Best Regards.
> >>>
> >>> ZHOU Yi
> >>>
> >>> On 08/05/2015 23:17, Andra Lungu wrote:
> >>>
> >>>  Hi Yi,
> >>>>
> >>>> To my knowledge, there is no simple way to generate this kind of
> >>>> DataSet(i.e. there is no env.generateGaussianSequence()).
> >>>> However, if you look in flink-perf, Till used something like this
> there:
> >>>>
> >>>>
> >>>>
> https://github.com/project-flink/flink-perf/blob/master/flink-jobs/src/main/scala/com/github/projectflink/als/ALSDataGeneration.scala
> >>>> Maybe he can give you some tips.
> >>>>
> >>>> You can also call random.nextGaussian() in Java.
> >>>>
> >>>>
> >>>>
> http://docs.oracle.com/javase/7/docs/api/java/util/Random.html#nextGaussian%28%29
> >>>>
> >>>> Not sure if this helps, but there is a paper on generating this kind
> of
> >>>> distribution.
> >>>> http://ifisc.uib-csic.es/raul/publications/P/P44_tc93.pdf
> >>>>
> >>>> Best of luck,
> >>>> Andra
> >>>>
> >>>>
> >>>> On Fri, May 8, 2015 at 9:45 PM, Yi ZHOU <zhouyi0...@hotmail.com>
> wrote:
> >>>>
> >>>>   Hello, all
> >>>>
> >>>>> when I tested AP algorithm, I had a little question :
> >>>>>    how to generate a DataSet in gaussian distribution? Is there a
> >>>>> implemented funtion?
> >>>>>
> >>>>> Does any one has a solution? Thank you,
> >>>>>
> >>>>> ZHOU Yi
> >>>>>
> >>>>>
> >>>>>
> >
>

Reply via email to