There is probably a different CSV input format implementation which drops
invalid lines (too long lines).

Is that actually desired behavior, simply dropping malformatted input?

On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <> wrote:

> Hm, you was write
> I checked all files, one by one and found an issue with a line in one of
> them... It's really unexpected for me as far as I run spark job on the same
> dataset and "wrong" rows were filtered out without issues.
> Thanks for help!
> Thank you,
> Konstantin Kudryavtsev
> On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <> wrote:
>> Ah, that makes sense!
>> The problem is not in the core runtime, it is in the delimited input
>> format. It probably looks for the line split character and never finds it,
>> so that it starts buffering a super large line (gigabytes) which leads to
>> the OOM exception.
>> Can you check whether the line split character and the encoding are
>> properly defined?
>> Would actually be good to define a max line length (sane default +
>> configurable value) that reports when lines seem to extend a maximum length
>> (which is usually a misconfiguration of the split character)
>> Greetings,
>> Stephan
>> On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <
>>> wrote:
>>> 10/08/2015 16:25:48     CHAIN DataSource (at
>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)
>>> ( -> Filter (Filter at
>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap
>>> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at
>>>         at
>>>         at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>         at
>>>         at
>>> 10/08/2015 16:25:48     Job execution switched to status FAILING.
>>> 10/08/2015 16:25:48     DataSink
>>> (
>>> switched to CANCELED
>>> 10/08/2015 16:25:48     Job execution switched to status FAILED.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>>         at
>>>         at
>>>         at
>>>         at
>>> org.apache.flink.client.program.ContextEnvironment.execute(
>>>         at
>>>         at
>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>>         at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
>>>         at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
>>>         at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>         at java.lang.reflect.Method.invoke(
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(
>>>         at
>>>         at
>>> org.apache.flink.client.CliFrontend.executeProgram(
>>>         at
>>>         at
>>> org.apache.flink.client.CliFrontend.parseParameters(
>>>         at org.apache.flink.client.CliFrontend.main(
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>>         at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
>>>         at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>         at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>         at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>         at
>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>>         at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>>         at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>         at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>         at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>         at$class.aroundReceive(Actor.scala:465)
>>>         at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>         at
>>>         at
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>         at
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>         at
>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>>         at
>>>         at
>>>         at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>         at
>>>         at
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <> wrote:
>>>> Can you paste the exception stack trace?
>>>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <
>>>>> wrote:
>>>>> It's DataSet program that performs simple filtering, crossjoin and
>>>>> aggregation.
>>>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3
>>>>> connector doesn't work at all.
>>>>> Currently I have 3 taskmanagers each 5k MB, but I tried different
>>>>> configurations and all leads to the same exception
>>>>> *Sent from my ZenFone
>>>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <> wrote:
>>>>>> Can you give us a bit more background?  What exactly is your program
>>>>>> doing?
>>>>>>   - Are you running a DataSet program, or a DataStream program?
>>>>>>   - Is it one simple source that reads from S3, or are there multiple
>>>>>> sources?
>>>>>>   - What operations do you apply on the CSV file?
>>>>>>   - Are you using Flink's S3 connector, or the Hadoop S3 file system?
>>>>>> Greetings,
>>>>>> Stephan
>>>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <
>>>>>>> wrote:
>>>>>>> Hi guys,
>>>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and
>>>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised the fact that
>>>>>>> Flink failed with OutOfMemory: Java Heap space
>>>>>>> I tried to find the reason:
>>>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager
>>>>>>> 2) then build Heap histo:
>>>>>>> $ jmap -histo:live 19648 | head -n23
>>>>>>>  num     #instances         #bytes  class name
>>>>>>> ----------------------------------------------
>>>>>>>    1:        131018     3763501304  [B
>>>>>>>    2:         61022        7820352  <methodKlass>
>>>>>>>    3:         61022        7688456  <constMethodKlass>
>>>>>>>    4:          4971        5454408  <constantPoolKlass>
>>>>>>>    5:          4966        4582232  <instanceKlassKlass>
>>>>>>>    6:          4169        3003104  <constantPoolCacheKlass>
>>>>>>>    7:         15696        1447168  [C
>>>>>>>    8:          1291         638824  [Ljava.lang.Object;
>>>>>>>    9:          5318         506000  java.lang.Class
>>>>>>> Do you have any ideas what can be the reason and how it can be
>>>>>>> fixed?
>>>>>>> Is Flink uses out-of-heap memory?
>>>>>>> Thank you,
>>>>>>> Konstantin Kudryavtsev

Reply via email to