There is probably a different CSV input format implementation which drops invalid lines (too long lines).
Is that actually desired behavior, simply dropping malformatted input? On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev < kudryavtsev.konstan...@gmail.com> wrote: > Hm, you was write > > I checked all files, one by one and found an issue with a line in one of > them... It's really unexpected for me as far as I run spark job on the same > dataset and "wrong" rows were filtered out without issues. > > Thanks for help! > > Thank you, > Konstantin Kudryavtsev > > On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <se...@apache.org> wrote: > >> Ah, that makes sense! >> >> The problem is not in the core runtime, it is in the delimited input >> format. It probably looks for the line split character and never finds it, >> so that it starts buffering a super large line (gigabytes) which leads to >> the OOM exception. >> >> Can you check whether the line split character and the encoding are >> properly defined? >> >> Would actually be good to define a max line length (sane default + >> configurable value) that reports when lines seem to extend a maximum length >> (which is usually a misconfiguration of the split character) >> >> Greetings, >> Stephan >> >> >> On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev < >> kudryavtsev.konstan...@gmail.com> wrote: >> >>> 10/08/2015 16:25:48 CHAIN DataSource (at >>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) >>> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at >>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap >>> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED >>> java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 10/08/2015 16:25:48 Job execution switched to status FAILING. >>> 10/08/2015 16:25:48 DataSink >>> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) >>> switched to CANCELED >>> 10/08/2015 16:25:48 Job execution switched to status FAILED. >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> at org.apache.flink.client.program.Client.run(Client.java:413) >>> at org.apache.flink.client.program.Client.run(Client.java:356) >>> at org.apache.flink.client.program.Client.run(Client.java:349) >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >>> at >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >>> at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) >>> at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) >>> at com.epam.AirSetJobExample.main(AirSetJobExample.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>> at org.apache.flink.client.program.Client.run(Client.java:315) >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>> at >>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>> at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> Thank you, >>> Konstantin Kudryavtsev >>> >>> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Can you paste the exception stack trace? >>>> >>>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev < >>>> kudryavtsev.konstan...@gmail.com> wrote: >>>> >>>>> It's DataSet program that performs simple filtering, crossjoin and >>>>> aggregation. >>>>> >>>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 >>>>> connector doesn't work at all. >>>>> >>>>> Currently I have 3 taskmanagers each 5k MB, but I tried different >>>>> configurations and all leads to the same exception >>>>> >>>>> *Sent from my ZenFone >>>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote: >>>>> >>>>>> Can you give us a bit more background? What exactly is your program >>>>>> doing? >>>>>> >>>>>> - Are you running a DataSet program, or a DataStream program? >>>>>> - Is it one simple source that reads from S3, or are there multiple >>>>>> sources? >>>>>> - What operations do you apply on the CSV file? >>>>>> - Are you using Flink's S3 connector, or the Hadoop S3 file system? >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev < >>>>>> kudryavtsev.konstan...@gmail.com> wrote: >>>>>> >>>>>>> Hi guys, >>>>>>> >>>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and >>>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised the fact that >>>>>>> Flink failed with OutOfMemory: Java Heap space >>>>>>> >>>>>>> I tried to find the reason: >>>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager >>>>>>> 2) then build Heap histo: >>>>>>> $ jmap -histo:live 19648 | head -n23 >>>>>>> num #instances #bytes class name >>>>>>> ---------------------------------------------- >>>>>>> 1: 131018 3763501304 [B >>>>>>> 2: 61022 7820352 <methodKlass> >>>>>>> 3: 61022 7688456 <constMethodKlass> >>>>>>> 4: 4971 5454408 <constantPoolKlass> >>>>>>> 5: 4966 4582232 <instanceKlassKlass> >>>>>>> 6: 4169 3003104 <constantPoolCacheKlass> >>>>>>> 7: 15696 1447168 [C >>>>>>> 8: 1291 638824 [Ljava.lang.Object; >>>>>>> 9: 5318 506000 java.lang.Class >>>>>>> >>>>>>> >>>>>>> Do you have any ideas what can be the reason and how it can be >>>>>>> fixed? >>>>>>> Is Flink uses out-of-heap memory? >>>>>>> >>>>>>> >>>>>>> Thank you, >>>>>>> Konstantin Kudryavtsev >>>>>>> >>>>>> >>>>>> >>>> >>> >> >