Hi Akshay, Flink currently does not support to automatically distribute hot keys across different JVMs. What you can do is to adapt the parallelism/number of partitions manually if you encounter that one partition contains a lot of hot keys. This might mitigate the problem by partitioning the hot keys into different partitions.
Apart from that, the problem seems to be as Zhijiang indicated that your join result is quite large. One record is 1 GB large. Try to decrease it or give more memory to your TMs. Cheers, Till On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <akshaymend...@gmail.com> wrote: > Hi Zhijiang, > Thanks for the quick reply. My concern is more towards > how flink perform joins of two *skewed *datasets. Pig > <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark > <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join > of skewed datasets. The record size that you are mentioning about in your > reply is after join operation takes place which is definitely going to be > huge enough not to fit in jvm task manager task slot in my use case. We > want to know if there is a way in flink to handle such skewed keys by > distributing their values across different jvms. Let me know if you need > more clarity on the issue. > Thanks, > Akshay > > On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wangzhijiang...@aliyun.com> > wrote: > >> Hi Akshay, >> >> You encountered an existing issue for serializing large records to cause >> OOM. >> >> Every subpartition would create a separate serializer before, and each >> serializer would maintain an internal bytes array for storing intermediate >> serialization results. The key point is that these overhead internal bytes >> array are not managed by framework, and their size would exceed with the >> record size dynamically. If your job has many subpartitions with large >> records, it may probably cause OOM issue. >> >> I already improved this issue to some extent by sharing only one >> serializer for all subpartitions [1], that means we only have one bytes >> array overhead at most. This issue is covered in release-1.7. >> Currently the best option may reduce your record size if possible or you >> can increase the heap size of task manager container. >> >> [1] https://issues.apache.org/jira/browse/FLINK-9913 >> >> Best, >> Zhijiang >> >> ------------------------------------------------------------------ >> 发件人:Akshay Mendole <akshaymend...@gmail.com> >> 发送时间:2018年11月22日(星期四) 13:43 >> 收件人:user <user@flink.apache.org> >> 主 题:OutOfMemoryError while doing join operation in flink >> >> Hi, >> We are converting one of our pig pipelines to flink using apache >> beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, >> enriches them, joins them and dumps back to hdfs. The data set R1 is >> skewed. In a sense, it has few keys with lot of records. When we converted >> the pig pipeline to apache beam and ran it using flink on a production yarn >> cluster, we got the following error >> >> 2018-11-21 16:52:25,307 ERROR >> org.apache.flink.runtime.operators.BatchTask - Error in >> task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100) >> java.lang.RuntimeException: Emitting the record caused an I/O exception: >> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds >> JVM heap space >> at >> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) >> at >> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >> at >> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140) >> at >> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85) >> at >> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111) >> at >> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.IOException: Failed to serialize element. Serialized >> size (> 1136656562 bytes) exceeds JVM heap space >> at >> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323) >> at >> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) >> at >> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) >> at java.io.DataOutputStream.write(DataOutputStream.java:107) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >> at >> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) >> at >> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) >> at >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) >> at >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) >> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) >> at >> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) >> at >> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) >> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) >> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) >> at >> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) >> at >> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) >> at >> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >> ... 9 more >> Caused by: java.lang.OutOfMemoryError: Java heap space >> at >> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) >> at >> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) >> at >> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) >> at java.io.DataOutputStream.write(DataOutputStream.java:107) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >> at >> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) >> at >> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) >> at >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) >> at >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) >> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) >> at >> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) >> at >> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) >> at >> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) >> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) >> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) >> at >> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) >> at >> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) >> at >> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >> >> >> From the exception view in flink job manager dashboard, we could see that >> this is happening at a join operation. >> *When I say R1 dataset is skewed, there are some keys with number of >> occurrences as high as 8,000,000 , while most of the keys occur just once.* >> *Dataset R2 has records with keys occurring at most once.* >> Also, if we exclude such keys which has high number of occurrences, the >> pipeline runs absolutely fine which proves it is happening due these few >> keys only. >> >> Hadoop version : 2.7.1 >> Beam verision : 2.8.0 >> Flink Runner version : 2.8.0 >> >> Let me know what more information should I fetch and post here in order >> for you to help me resolve this. >> >> Thanks, >> Akshay >> >> >>