What's confuse me, I'm running Flink on yarn with the following command: ./yarn-session.sh -n 4 -jm 2096 -tm 5000 so I expect to have TaskManager with almost 5GB ram available, but taskmanager manel I found that each task manager has the following conf: Flink Managed Memory: 2460 mb CPU cores: 4 Physical Memory 15046 mb and stats: *Memory.heap.used*Current: 248MAvg: 246M*Memory.flink.used*Current: 2GAvg: 2G
in UI on configuration panel I found: taskmanager.heap.mb512 obmanager.heap.mb256 Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:29 PM, KOSTIANTYN Kudriavtsev < kudryavtsev.konstan...@gmail.com> wrote: > 10/08/2015 16:25:48 CHAIN DataSource (at > com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) > (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at > com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap > (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED > java.lang.OutOfMemoryError: Java heap space > at > org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) > at > org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > 10/08/2015 16:25:48 Job execution switched to status FAILING. > 10/08/2015 16:25:48 DataSink > (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) > switched to CANCELED > 10/08/2015 16:25:48 Job execution switched to status FAILED. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.Client.run(Client.java:413) > at org.apache.flink.client.program.Client.run(Client.java:356) > at org.apache.flink.client.program.Client.run(Client.java:349) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) > at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) > at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) > at com.epam.AirSetJobExample.main(AirSetJobExample.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:315) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) > at > org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > > Thank you, > Konstantin Kudryavtsev > > On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: > >> Can you paste the exception stack trace? >> >> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev < >> kudryavtsev.konstan...@gmail.com> wrote: >> >>> It's DataSet program that performs simple filtering, crossjoin and >>> aggregation. >>> >>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector >>> doesn't work at all. >>> >>> Currently I have 3 taskmanagers each 5k MB, but I tried different >>> configurations and all leads to the same exception >>> >>> *Sent from my ZenFone >>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote: >>> >>>> Can you give us a bit more background? What exactly is your program >>>> doing? >>>> >>>> - Are you running a DataSet program, or a DataStream program? >>>> - Is it one simple source that reads from S3, or are there multiple >>>> sources? >>>> - What operations do you apply on the CSV file? >>>> - Are you using Flink's S3 connector, or the Hadoop S3 file system? >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev < >>>> kudryavtsev.konstan...@gmail.com> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying >>>>> to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink >>>>> failed with OutOfMemory: Java Heap space >>>>> >>>>> I tried to find the reason: >>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager >>>>> 2) then build Heap histo: >>>>> $ jmap -histo:live 19648 | head -n23 >>>>> num #instances #bytes class name >>>>> ---------------------------------------------- >>>>> 1: 131018 3763501304 [B >>>>> 2: 61022 7820352 <methodKlass> >>>>> 3: 61022 7688456 <constMethodKlass> >>>>> 4: 4971 5454408 <constantPoolKlass> >>>>> 5: 4966 4582232 <instanceKlassKlass> >>>>> 6: 4169 3003104 <constantPoolCacheKlass> >>>>> 7: 15696 1447168 [C >>>>> 8: 1291 638824 [Ljava.lang.Object; >>>>> 9: 5318 506000 java.lang.Class >>>>> >>>>> >>>>> Do you have any ideas what can be the reason and how it can be fixed? >>>>> Is Flink uses out-of-heap memory? >>>>> >>>>> >>>>> Thank you, >>>>> Konstantin Kudryavtsev >>>>> >>>> >>>> >> >