We actually have work in progress to reduce the memory fragmentation, which should solve this issue. I hope it will be ready for the 0.9 release.
On Thu, May 14, 2015 at 8:46 AM, Andra Lungu <lungu.an...@gmail.com> wrote: > Hi Yi, > > The problem here, as Stephan already suggested, is that you have a very > large job. Each complex operation (join, coGroup, etc) needs a > share of memory. > In Flink, for the test cases at least, they restrict the TaskManagers' > memory to just 80MB in order to run multiple tests in parallel on Travis. > If you chain lots of operators, you could easily exceed that threshold. > > The only way this test case would work is if you would split it somehow. > Problem is that for Affinity Propagation, one (myself included) would like > to test the whole algorithm at once. So maybe a quick fix would be to > increase the amount of memory for the TMs. > > An almost-identical discussion could be found here: > https://www.mail-archive.com/dev@flink.apache.org/msg01631.html > > Andra > > On Thu, May 14, 2015 at 12:35 AM, Stephan Ewen <se...@apache.org> wrote: > > > You are probably starting the system with very little memory, or you have > > an immensely large job. > > > > Have a look here, I think this discussion on the user mailing list a few > > days ago is about the same issue: > > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-exception-td1206.html > > > > On Thu, May 14, 2015 at 12:22 AM, Yi ZHOU <zhouyi0...@hotmail.com> > wrote: > > > > > Hello , > > > > > > Thank @Stephan for the explanations. Though I with these information, I > > > still have no clue to trace the error. > > > > > > Now, the exception stack in the *cluster mode* always looks like this > > > (even I set env.setParallelism(1)): > > > > > > org.apache.flink.runtime.client.JobExecutionException: Job execution > > > failed. > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > > > at > > > > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > > at > > > > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > > at > > > > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > > at > > > > > > org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:160) > > > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > > > at > > > > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > > > at > > > > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > > > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > at > > > > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:95) > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > > at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > at > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > at > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Caused by: java.lang.Exception: The data preparation for task 'Join > (Join > > > at groupReduceOnNeighbors(Graph.java:1212)) > > > (d2338ea96e86b505867b3cf3bffec007)' , caused an error: Too few memory > > > segments provided. Hash Join needs at least 33 memory segments. > > > at > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:469) > > > at > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > > > at > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223) > > > at java.lang.Thread.run(Thread.java:701) > > > Caused by: java.lang.IllegalArgumentException: Too few memory segments > > > provided. Hash Join needs at least 33 memory segments. > > > at > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:373) > > > at > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:359) > > > at > > > > > > org.apache.flink.runtime.operators.hash.HashMatchIteratorBase.getHashJoin(HashMatchIteratorBase.java:48) > > > at > > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.<init>(NonReusingBuildSecondHashMatchIterator.java:77) > > > at > > > > > > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:151) > > > at > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:464) > > > ... 3 more > > > > > > > > > It looks that the memory is in need when we do "Join at > > > groupReduceOnNeighbors(Graph.java:1212)", however, none of the lines is > > > directed related with my code. I don't know where i should pay > attention > > > to adapt the cluster mode. > > > I write the data transformations as told in the doc and examples(Data > > > transformation and Gelly). Any one know the cause of this kind of > error? > > > > > > Here is a link to my test code. > > > > > > > > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java > > > > > > > > > > > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropogation.java > > > > > > Thanks > > > > > > > > > ZHOU Yi > > > > > > On 13/05/2015 01:04, Stephan Ewen wrote: > > > > > >> Hi! > > >> > > >> The *collection execution* runs the program simply as functions over > > Java > > >> collections. It is single threaded, always local, and does not use any > > >> Flink memory management, serialization, or so. It is designed to be > very > > >> lightweight and is tailored towards very small problems. > > >> > > >> The *cluster mode* is the regular Flink mode. It spawns a Flink > cluster > > >> > > >> with one worker and multiple slots. It runs programs parallel, uses > > >> managed > > >> memory, and should behave pretty much like the regular Flink > > installation > > >> (with one worker and little memory). > > >> > > >> To debug your test, I would first see whether it is parallelism > > sensitive. > > >> The cluster mode uses parallelism 4 by default, the collection > execution > > >> is > > >> single threaded (parallelism 1). You can force the parallelism to be > > >> always > > >> one by setting it on the execution environment. > > >> > > >> Stephan > > >> > > >> > > >> > > >> > > >> On Wed, May 13, 2015 at 12:44 AM, Yi ZHOU <zhouyi0...@hotmail.com> > > wrote: > > >> > > >> Hello, > > >>> > > >>> Thanks Andra for the gaussian sequence generation. It is a little > > tricky, > > >>> i just leave this part for future work. > > >>> > > >>> I meet another problem in AffinityPropogation algorithm. I write a > few > > >>> test code for it. > > >>> > > >>> > > >>> > > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java > > >>> < > > >>> > > >>> > > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java > > >>> >It > > >>> passes the COLLECTION while failed when execution mode = CLUSTER. > > >>> I not very clear about the differences and the reason. > > >>> > > >>> Does anyone give me a clue? > > >>> > > >>> Thanks, > > >>> Best Regards. > > >>> > > >>> ZHOU Yi > > >>> > > >>> On 08/05/2015 23:17, Andra Lungu wrote: > > >>> > > >>> Hi Yi, > > >>>> > > >>>> To my knowledge, there is no simple way to generate this kind of > > >>>> DataSet(i.e. there is no env.generateGaussianSequence()). > > >>>> However, if you look in flink-perf, Till used something like this > > there: > > >>>> > > >>>> > > >>>> > > > https://github.com/project-flink/flink-perf/blob/master/flink-jobs/src/main/scala/com/github/projectflink/als/ALSDataGeneration.scala > > >>>> Maybe he can give you some tips. > > >>>> > > >>>> You can also call random.nextGaussian() in Java. > > >>>> > > >>>> > > >>>> > > > http://docs.oracle.com/javase/7/docs/api/java/util/Random.html#nextGaussian%28%29 > > >>>> > > >>>> Not sure if this helps, but there is a paper on generating this kind > > of > > >>>> distribution. > > >>>> http://ifisc.uib-csic.es/raul/publications/P/P44_tc93.pdf > > >>>> > > >>>> Best of luck, > > >>>> Andra > > >>>> > > >>>> > > >>>> On Fri, May 8, 2015 at 9:45 PM, Yi ZHOU <zhouyi0...@hotmail.com> > > wrote: > > >>>> > > >>>> Hello, all > > >>>> > > >>>>> when I tested AP algorithm, I had a little question : > > >>>>> how to generate a DataSet in gaussian distribution? Is there a > > >>>>> implemented funtion? > > >>>>> > > >>>>> Does any one has a solution? Thank you, > > >>>>> > > >>>>> ZHOU Yi > > >>>>> > > >>>>> > > >>>>> > > > > > >