You are probably starting the system with very little memory, or you have an immensely large job.
Have a look here, I think this discussion on the user mailing list a few days ago is about the same issue: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-exception-td1206.html On Thu, May 14, 2015 at 12:22 AM, Yi ZHOU <zhouyi0...@hotmail.com> wrote: > Hello , > > Thank @Stephan for the explanations. Though I with these information, I > still have no clue to trace the error. > > Now, the exception stack in the *cluster mode* always looks like this > (even I set env.setParallelism(1)): > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$receiveTestingMessages$1.applyOrElse(TestingJobManager.scala:160) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The data preparation for task 'Join (Join > at groupReduceOnNeighbors(Graph.java:1212)) > (d2338ea96e86b505867b3cf3bffec007)' , caused an error: Too few memory > segments provided. Hash Join needs at least 33 memory segments. > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:469) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223) > at java.lang.Thread.run(Thread.java:701) > Caused by: java.lang.IllegalArgumentException: Too few memory segments > provided. Hash Join needs at least 33 memory segments. > at > org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:373) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.<init>(MutableHashTable.java:359) > at > org.apache.flink.runtime.operators.hash.HashMatchIteratorBase.getHashJoin(HashMatchIteratorBase.java:48) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.<init>(NonReusingBuildSecondHashMatchIterator.java:77) > at > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:151) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:464) > ... 3 more > > > It looks that the memory is in need when we do "Join at > groupReduceOnNeighbors(Graph.java:1212)", however, none of the lines is > directed related with my code. I don't know where i should pay attention > to adapt the cluster mode. > I write the data transformations as told in the doc and examples(Data > transformation and Gelly). Any one know the cause of this kind of error? > > Here is a link to my test code. > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java > > > https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropogation.java > > Thanks > > > ZHOU Yi > > On 13/05/2015 01:04, Stephan Ewen wrote: > >> Hi! >> >> The *collection execution* runs the program simply as functions over Java >> collections. It is single threaded, always local, and does not use any >> Flink memory management, serialization, or so. It is designed to be very >> lightweight and is tailored towards very small problems. >> >> The *cluster mode* is the regular Flink mode. It spawns a Flink cluster >> >> with one worker and multiple slots. It runs programs parallel, uses >> managed >> memory, and should behave pretty much like the regular Flink installation >> (with one worker and little memory). >> >> To debug your test, I would first see whether it is parallelism sensitive. >> The cluster mode uses parallelism 4 by default, the collection execution >> is >> single threaded (parallelism 1). You can force the parallelism to be >> always >> one by setting it on the execution environment. >> >> Stephan >> >> >> >> >> On Wed, May 13, 2015 at 12:44 AM, Yi ZHOU <zhouyi0...@hotmail.com> wrote: >> >> Hello, >>> >>> Thanks Andra for the gaussian sequence generation. It is a little tricky, >>> i just leave this part for future work. >>> >>> I meet another problem in AffinityPropogation algorithm. I write a few >>> test code for it. >>> >>> >>> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java >>> < >>> >>> https://github.com/joey001/flink/blob/ap_add/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/AffinityPropogationITCase.java >>> >It >>> passes the COLLECTION while failed when execution mode = CLUSTER. >>> I not very clear about the differences and the reason. >>> >>> Does anyone give me a clue? >>> >>> Thanks, >>> Best Regards. >>> >>> ZHOU Yi >>> >>> On 08/05/2015 23:17, Andra Lungu wrote: >>> >>> Hi Yi, >>>> >>>> To my knowledge, there is no simple way to generate this kind of >>>> DataSet(i.e. there is no env.generateGaussianSequence()). >>>> However, if you look in flink-perf, Till used something like this there: >>>> >>>> >>>> https://github.com/project-flink/flink-perf/blob/master/flink-jobs/src/main/scala/com/github/projectflink/als/ALSDataGeneration.scala >>>> Maybe he can give you some tips. >>>> >>>> You can also call random.nextGaussian() in Java. >>>> >>>> >>>> http://docs.oracle.com/javase/7/docs/api/java/util/Random.html#nextGaussian%28%29 >>>> >>>> Not sure if this helps, but there is a paper on generating this kind of >>>> distribution. >>>> http://ifisc.uib-csic.es/raul/publications/P/P44_tc93.pdf >>>> >>>> Best of luck, >>>> Andra >>>> >>>> >>>> On Fri, May 8, 2015 at 9:45 PM, Yi ZHOU <zhouyi0...@hotmail.com> wrote: >>>> >>>> Hello, all >>>> >>>>> when I tested AP algorithm, I had a little question : >>>>> how to generate a DataSet in gaussian distribution? Is there a >>>>> implemented funtion? >>>>> >>>>> Does any one has a solution? Thank you, >>>>> >>>>> ZHOU Yi >>>>> >>>>> >>>>> >