I think that's actually very use case specific.

You're code will never see the malformed record because it is dropped by
the input format.
Other applications might rely on complete input and would prefer an
exception to be notified about invalid input.
Flink's CsvInputFormat has a parameter "lenient" which makes this property
configurable.

I agree with Stephan that we should add a record-size parameter to the
DelimitedOutputFormat (which is the basis for the CsvInputFormat).

Cheers, Fabain

2015-10-08 19:33 GMT+02:00 KOSTIANTYN Kudriavtsev <
kudryavtsev.konstan...@gmail.com>:

> Yes it's,
> I'm checking number of columns per line to filter out mailformed
>
> *Sent from my ZenFone
> On Oct 8, 2015 1:19 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
>> There is probably a different CSV input format implementation which drops
>> invalid lines (too long lines).
>>
>> Is that actually desired behavior, simply dropping malformatted input?
>>
>> On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>>> Hm, you was write
>>>
>>> I checked all files, one by one and found an issue with a line in one of
>>> them... It's really unexpected for me as far as I run spark job on the same
>>> dataset and "wrong" rows were filtered out without issues.
>>>
>>> Thanks for help!
>>>
>>> Thank you,
>>> Konstantin Kudryavtsev
>>>
>>> On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Ah, that makes sense!
>>>>
>>>> The problem is not in the core runtime, it is in the delimited input
>>>> format. It probably looks for the line split character and never finds it,
>>>> so that it starts buffering a super large line (gigabytes) which leads to
>>>> the OOM exception.
>>>>
>>>> Can you check whether the line split character and the encoding are
>>>> properly defined?
>>>>
>>>> Would actually be good to define a max line length (sane default +
>>>> configurable value) that reports when lines seem to extend a maximum length
>>>> (which is usually a misconfiguration of the split character)
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <
>>>> kudryavtsev.konstan...@gmail.com> wrote:
>>>>
>>>>> 10/08/2015 16:25:48     CHAIN DataSource (at
>>>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)
>>>>> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at
>>>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap
>>>>> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>>>>>         at
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> 10/08/2015 16:25:48     Job execution switched to status FAILING.
>>>>> 10/08/2015 16:25:48     DataSink
>>>>> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1)
>>>>> switched to CANCELED
>>>>> 10/08/2015 16:25:48     Job execution switched to status FAILED.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program execution failed: Job execution failed.
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:413)
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:356)
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:349)
>>>>>         at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>>>>>         at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>>>>>         at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>>>>         at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
>>>>>         at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
>>>>>         at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:315)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>>         at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
>>>>>         at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>         at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>         at
>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>         at
>>>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>>>>         at
>>>>> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>>>>         at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>>>>         at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>         at
>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>         at
>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>         at
>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>         at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>         at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>         at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>         at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>>>>         at
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>>>>>         at
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>>
>>>>> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Can you paste the exception stack trace?
>>>>>>
>>>>>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <
>>>>>> kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>
>>>>>>> It's DataSet program that performs simple filtering, crossjoin and
>>>>>>> aggregation.
>>>>>>>
>>>>>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3
>>>>>>> connector doesn't work at all.
>>>>>>>
>>>>>>> Currently I have 3 taskmanagers each 5k MB, but I tried different
>>>>>>> configurations and all leads to the same exception
>>>>>>>
>>>>>>> *Sent from my ZenFone
>>>>>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>>>>>>
>>>>>>>> Can you give us a bit more background?  What exactly is your
>>>>>>>> program doing?
>>>>>>>>
>>>>>>>>   - Are you running a DataSet program, or a DataStream program?
>>>>>>>>   - Is it one simple source that reads from S3, or are there
>>>>>>>> multiple sources?
>>>>>>>>   - What operations do you apply on the CSV file?
>>>>>>>>   - Are you using Flink's S3 connector, or the Hadoop S3 file
>>>>>>>> system?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <
>>>>>>>> kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys,
>>>>>>>>>
>>>>>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and
>>>>>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised the fact that
>>>>>>>>> Flink failed with OutOfMemory: Java Heap space
>>>>>>>>>
>>>>>>>>> I tried to find the reason:
>>>>>>>>> 1) to identify TaskManager with a command ps aux | grep
>>>>>>>>> TaskManager
>>>>>>>>> 2) then build Heap histo:
>>>>>>>>> $ jmap -histo:live 19648 | head -n23
>>>>>>>>>  num     #instances         #bytes  class name
>>>>>>>>> ----------------------------------------------
>>>>>>>>>    1:        131018     3763501304  [B
>>>>>>>>>    2:         61022        7820352  <methodKlass>
>>>>>>>>>    3:         61022        7688456  <constMethodKlass>
>>>>>>>>>    4:          4971        5454408  <constantPoolKlass>
>>>>>>>>>    5:          4966        4582232  <instanceKlassKlass>
>>>>>>>>>    6:          4169        3003104  <constantPoolCacheKlass>
>>>>>>>>>    7:         15696        1447168  [C
>>>>>>>>>    8:          1291         638824  [Ljava.lang.Object;
>>>>>>>>>    9:          5318         506000  java.lang.Class
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Do you have any ideas what can be the reason and how it can be
>>>>>>>>> fixed?
>>>>>>>>> Is Flink uses out-of-heap memory?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>> Konstantin Kudryavtsev
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to