Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
------------------------------------------------------------------
发件人:Akshay Mendole <akshaymend...@gmail.com>
发送时间:2018年11月22日(星期四) 13:43
收件人:user <user@flink.apache.org>
主 题:OutOfMemoryError while doing join operation in flink

Hi,
    We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
beam and ran it using flink on a production yarn cluster, we got the following 
error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask      
            - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
(25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed 
to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap 
space
        at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at 
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
        at 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
1136656562 bytes) exceeds JVM heap space
        at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
        at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
        at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this 
is happening at a join operation. 
When I say R1 dataset is skewed, there are some keys with number of occurrences 
as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the 
pipeline runs absolutely fine which proves it is happening due these few keys 
only.

Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0

Let me know what more information should I fetch and post here in order for you 
to help me resolve this.

Thanks,
Akshay


Reply via email to