10/08/2015 16:25:48 CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED java.lang.OutOfMemoryError: Java heap space at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745)
10/08/2015 16:25:48 Job execution switched to status FAILING. 10/08/2015 16:25:48 DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED 10/08/2015 16:25:48 Job execution switched to status FAILED. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) at com.epam.AirSetJobExample.main(AirSetJobExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: > Can you paste the exception stack trace? > > On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev < > kudryavtsev.konstan...@gmail.com> wrote: > >> It's DataSet program that performs simple filtering, crossjoin and >> aggregation. >> >> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector >> doesn't work at all. >> >> Currently I have 3 taskmanagers each 5k MB, but I tried different >> configurations and all leads to the same exception >> >> *Sent from my ZenFone >> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote: >> >>> Can you give us a bit more background? What exactly is your program >>> doing? >>> >>> - Are you running a DataSet program, or a DataStream program? >>> - Is it one simple source that reads from S3, or are there multiple >>> sources? >>> - What operations do you apply on the CSV file? >>> - Are you using Flink's S3 connector, or the Hadoop S3 file system? >>> >>> Greetings, >>> Stephan >>> >>> >>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev < >>> kudryavtsev.konstan...@gmail.com> wrote: >>> >>>> Hi guys, >>>> >>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying >>>> to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink >>>> failed with OutOfMemory: Java Heap space >>>> >>>> I tried to find the reason: >>>> 1) to identify TaskManager with a command ps aux | grep TaskManager >>>> 2) then build Heap histo: >>>> $ jmap -histo:live 19648 | head -n23 >>>> num #instances #bytes class name >>>> ---------------------------------------------- >>>> 1: 131018 3763501304 [B >>>> 2: 61022 7820352 <methodKlass> >>>> 3: 61022 7688456 <constMethodKlass> >>>> 4: 4971 5454408 <constantPoolKlass> >>>> 5: 4966 4582232 <instanceKlassKlass> >>>> 6: 4169 3003104 <constantPoolCacheKlass> >>>> 7: 15696 1447168 [C >>>> 8: 1291 638824 [Ljava.lang.Object; >>>> 9: 5318 506000 java.lang.Class >>>> >>>> >>>> Do you have any ideas what can be the reason and how it can be fixed? >>>> Is Flink uses out-of-heap memory? >>>> >>>> >>>> Thank you, >>>> Konstantin Kudryavtsev >>>> >>> >>> >