Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input
format. It probably looks for the line split character and never finds it,
so that it starts buffering a super large line (gigabytes) which leads to
the OOM exception.

Can you check whether the line split character and the encoding are
properly defined?

Would actually be good to define a max line length (sane default +
configurable value) that reports when lines seem to extend a maximum length
(which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> 10/08/2015 16:25:48     CHAIN DataSource (at
> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)
> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at
> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap
> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
> java.lang.OutOfMemoryError: Java heap space
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>         at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
> 10/08/2015 16:25:48     Job execution switched to status FAILING.
> 10/08/2015 16:25:48     DataSink
> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1)
> switched to CANCELED
> 10/08/2015 16:25:48     Job execution switched to status FAILED.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at org.apache.flink.client.program.Client.run(Client.java:413)
>         at org.apache.flink.client.program.Client.run(Client.java:356)
>         at org.apache.flink.client.program.Client.run(Client.java:349)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>         at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
>         at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
>         at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>         at org.apache.flink.client.program.Client.run(Client.java:315)
>         at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>         at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> Thank you,
> Konstantin Kudryavtsev
>
> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Can you paste the exception stack trace?
>>
>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>>> It's DataSet program that performs simple filtering, crossjoin and
>>> aggregation.
>>>
>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector
>>> doesn't work at all.
>>>
>>> Currently I have 3 taskmanagers each 5k MB, but I tried different
>>> configurations and all leads to the same exception
>>>
>>> *Sent from my ZenFone
>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>>
>>>> Can you give us a bit more background?  What exactly is your program
>>>> doing?
>>>>
>>>>   - Are you running a DataSet program, or a DataStream program?
>>>>   - Is it one simple source that reads from S3, or are there multiple
>>>> sources?
>>>>   - What operations do you apply on the CSV file?
>>>>   - Are you using Flink's S3 connector, or the Hadoop S3 file system?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <
>>>> kudryavtsev.konstan...@gmail.com> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying
>>>>> to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink
>>>>> failed with OutOfMemory: Java Heap space
>>>>>
>>>>> I tried to find the reason:
>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager
>>>>> 2) then build Heap histo:
>>>>> $ jmap -histo:live 19648 | head -n23
>>>>>  num     #instances         #bytes  class name
>>>>> ----------------------------------------------
>>>>>    1:        131018     3763501304  [B
>>>>>    2:         61022        7820352  <methodKlass>
>>>>>    3:         61022        7688456  <constMethodKlass>
>>>>>    4:          4971        5454408  <constantPoolKlass>
>>>>>    5:          4966        4582232  <instanceKlassKlass>
>>>>>    6:          4169        3003104  <constantPoolCacheKlass>
>>>>>    7:         15696        1447168  [C
>>>>>    8:          1291         638824  [Ljava.lang.Object;
>>>>>    9:          5318         506000  java.lang.Class
>>>>>
>>>>>
>>>>> Do you have any ideas what can be the reason and how it can be fixed?
>>>>> Is Flink uses out-of-heap memory?
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>>
>>>>
>>>>
>>
>

Reply via email to