Hi,

Flink handles large data volumes quite well, large records are a bit more
tricky to tune.
You could try to reduce the number of parallel tasks per machine (#slots
per TM, #TMs per machine) and/or increase the amount of available JVM
memory (possible in exchange for managed memory as Zhijiang suggested).

Best, Fabian

Am Mi., 28. Nov. 2018 um 07:44 Uhr schrieb Akshay Mendole <
akshaymend...@gmail.com>:

> Hi Zhijiang,
>                   Thanks for the explanation and the workaround suggested.
> While this can work for the example stated above, we have more complex use
> cases where we would have to re-tune the above parameters. FYI, we ran into
> same problems when we did a simple groupBy on the skewed dataset.
> Thanks,
> Akshay
>
>
> On Fri, Nov 23, 2018 at 8:29 AM zhijiang <wangzhijiang...@aliyun.com>
> wrote:
>
>> Hi Akshay,
>>
>> Sorrry I have not thought of a proper way to handle single large record
>> in distributed task managers in flink. But I can give some hints for
>> adjusting the related memories for work around OOM issue.
>> Large fraction of memories in task manager are managed by flink for
>> efficiency, and these memories are long live persistent in JVM not recycled
>> by gc. You can check the parameter "taskmanager.memory.fraction" for this
>> and the default value is 0.7 if you have not changed, that means 7GB * 0.7
>> are used by framework.
>>
>> I am not sure what is the flink version you used. If I rememberd
>> correctly, before release-1.5 the network buffers also uses heap memories
>> by default, so you should also minus this part of memory from total task
>> manager memory.
>>
>> If not considering network buffer used by framework, you only leave 7GB *
>> 0.3 temporaray memories for other parts. The temporaray memories in
>> serializer will exceed twice as current size every time if not covering the
>> record size, that means one serializer may need 2GB overhead memories for
>> your 1GB record. You have 2 slots per task manager for running two tasks,
>> so the total overhead memories may need 4GB almost. So you can decrease
>> the "taskmanager.memory.fraction" in low fraction or increase the total
>> task manager to cover this overhead memories, or set one slot for each task
>> manager.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:Akshay Mendole <akshaymend...@gmail.com>
>> 发送时间:2018年11月23日(星期五) 02:54
>> 收件人:trohrmann <trohrm...@apache.org>
>> 抄 送:zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>;
>> Shreesha Madogaran <msshree...@gmail.com>
>> 主 题:Re: OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>>     Thanks for your reply. I tried running a simple "group by" on just
>> one dataset where few keys are repeatedly occurring (in order of millions)
>> and did not include any joins. I wanted to see if this issue is specific to
>> join. But as I was expecting, I ran into the same issue. I am giving 7GBs
>> to each task manager with 2 slots per task manager. From what I understood
>> so far, such cases where individual records somewhere in the pipeline
>> become so large that they should be handled in distributed manner instead
>> of handling them by a simple data structure in single JVM. I am guessing
>> there is no way to do this in Flink today.
>> Could you please confirm this?
>> Thanks,
>> Akshay
>>
>>
>> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>> Hi Akshay,
>>
>> Flink currently does not support to automatically distribute hot keys
>> across different JVMs. What you can do is to adapt the parallelism/number
>> of partitions manually if you encounter that one partition contains a lot
>> of hot keys. This might mitigate the problem by partitioning the hot keys
>> into different partitions.
>>
>> Apart from that, the problem seems to be as Zhijiang indicated that your
>> join result is quite large. One record is 1 GB large. Try to decrease it or
>> give more memory to your TMs.
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <akshaymend...@gmail.com>
>> wrote:
>> Hi Zhijiang,
>>                  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. We
>> want to know if there is a way in flink to handle such skewed keys by
>> distributing their values across different jvms. Let me know if you need
>> more clarity on the issue.
>> Thanks,
>> Akshay
>>
>> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wangzhijiang...@aliyun.com>
>> wrote:
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Akshay Mendole <akshaymend...@gmail.com>
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user <user@flink.apache.org>
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>>     We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>         at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>         at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>>         at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>>         at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>         ... 9 more
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>
>>
>> From the exception view in flink job manager dashboard, we could see that
>> this is happening at a join operation.
>> *When I say R1 dataset is skewed, there are some keys with number of
>> occurrences as high as 8,000,000 , while most of the keys occur just once.*
>> *Dataset R2 has records with keys occurring at most once.*
>> Also, if we exclude such keys which has high number of occurrences, the
>> pipeline runs absolutely fine which proves it is happening due these few
>> keys only.
>>
>> Hadoop version : 2.7.1
>> Beam verision : 2.8.0
>> Flink Runner version : 2.8.0
>>
>> Let me know what more information should I fetch and post here in order
>> for you to help me resolve this.
>>
>> Thanks,
>> Akshay
>>
>>
>>
>>

Reply via email to