Yes it's, I'm checking number of columns per line to filter out mailformed *Sent from my ZenFone On Oct 8, 2015 1:19 PM, "Stephan Ewen" <se...@apache.org> wrote:
> There is probably a different CSV input format implementation which drops > invalid lines (too long lines). > > Is that actually desired behavior, simply dropping malformatted input? > > On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev < > kudryavtsev.konstan...@gmail.com> wrote: > >> Hm, you was write >> >> I checked all files, one by one and found an issue with a line in one of >> them... It's really unexpected for me as far as I run spark job on the same >> dataset and "wrong" rows were filtered out without issues. >> >> Thanks for help! >> >> Thank you, >> Konstantin Kudryavtsev >> >> On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Ah, that makes sense! >>> >>> The problem is not in the core runtime, it is in the delimited input >>> format. It probably looks for the line split character and never finds it, >>> so that it starts buffering a super large line (gigabytes) which leads to >>> the OOM exception. >>> >>> Can you check whether the line split character and the encoding are >>> properly defined? >>> >>> Would actually be good to define a max line length (sane default + >>> configurable value) that reports when lines seem to extend a maximum length >>> (which is usually a misconfiguration of the split character) >>> >>> Greetings, >>> Stephan >>> >>> >>> On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev < >>> kudryavtsev.konstan...@gmail.com> wrote: >>> >>>> 10/08/2015 16:25:48 CHAIN DataSource (at >>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) >>>> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at >>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap >>>> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED >>>> java.lang.OutOfMemoryError: Java heap space >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> 10/08/2015 16:25:48 Job execution switched to status FAILING. >>>> 10/08/2015 16:25:48 DataSink >>>> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) >>>> switched to CANCELED >>>> 10/08/2015 16:25:48 Job execution switched to status FAILED. >>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>> execution failed: Job execution failed. >>>> at org.apache.flink.client.program.Client.run(Client.java:413) >>>> at org.apache.flink.client.program.Client.run(Client.java:356) >>>> at org.apache.flink.client.program.Client.run(Client.java:349) >>>> at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >>>> at >>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >>>> at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) >>>> at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) >>>> at com.epam.AirSetJobExample.main(AirSetJobExample.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>> at org.apache.flink.client.program.Client.run(Client.java:315) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) >>>> at >>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed. >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>> at >>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> Thank you, >>>> Konstantin Kudryavtsev >>>> >>>> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Can you paste the exception stack trace? >>>>> >>>>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev < >>>>> kudryavtsev.konstan...@gmail.com> wrote: >>>>> >>>>>> It's DataSet program that performs simple filtering, crossjoin and >>>>>> aggregation. >>>>>> >>>>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 >>>>>> connector doesn't work at all. >>>>>> >>>>>> Currently I have 3 taskmanagers each 5k MB, but I tried different >>>>>> configurations and all leads to the same exception >>>>>> >>>>>> *Sent from my ZenFone >>>>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote: >>>>>> >>>>>>> Can you give us a bit more background? What exactly is your program >>>>>>> doing? >>>>>>> >>>>>>> - Are you running a DataSet program, or a DataStream program? >>>>>>> - Is it one simple source that reads from S3, or are there >>>>>>> multiple sources? >>>>>>> - What operations do you apply on the CSV file? >>>>>>> - Are you using Flink's S3 connector, or the Hadoop S3 file system? >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev < >>>>>>> kudryavtsev.konstan...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi guys, >>>>>>>> >>>>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and >>>>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised the fact that >>>>>>>> Flink failed with OutOfMemory: Java Heap space >>>>>>>> >>>>>>>> I tried to find the reason: >>>>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager >>>>>>>> 2) then build Heap histo: >>>>>>>> $ jmap -histo:live 19648 | head -n23 >>>>>>>> num #instances #bytes class name >>>>>>>> ---------------------------------------------- >>>>>>>> 1: 131018 3763501304 [B >>>>>>>> 2: 61022 7820352 <methodKlass> >>>>>>>> 3: 61022 7688456 <constMethodKlass> >>>>>>>> 4: 4971 5454408 <constantPoolKlass> >>>>>>>> 5: 4966 4582232 <instanceKlassKlass> >>>>>>>> 6: 4169 3003104 <constantPoolCacheKlass> >>>>>>>> 7: 15696 1447168 [C >>>>>>>> 8: 1291 638824 [Ljava.lang.Object; >>>>>>>> 9: 5318 506000 java.lang.Class >>>>>>>> >>>>>>>> >>>>>>>> Do you have any ideas what can be the reason and how it can be >>>>>>>> fixed? >>>>>>>> Is Flink uses out-of-heap memory? >>>>>>>> >>>>>>>> >>>>>>>> Thank you, >>>>>>>> Konstantin Kudryavtsev >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>> >> >