Hi, We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error
2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100) java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140) at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85) at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323) at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 9 more Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >From the exception view in flink job manager dashboard, we could see that this is happening at a join operation. *When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.* *Dataset R2 has records with keys occurring at most once.* Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only. Hadoop version : 2.7.1 Beam verision : 2.8.0 Flink Runner version : 2.8.0 Let me know what more information should I fetch and post here in order for you to help me resolve this. Thanks, Akshay