Hi, Thanks for your reply. I tried running a simple "group by" on just one dataset where few keys are repeatedly occurring (in order of millions) and did not include any joins. I wanted to see if this issue is specific to join. But as I was expecting, I ran into the same issue. I am giving 7GBs to each task manager with 2 slots per task manager. From what I understood so far, such cases where individual records somewhere in the pipeline become so large that they should be handled in distributed manner instead of handling them by a simple data structure in single JVM. I am guessing there is no way to do this in Flink today. Could you please confirm this? Thanks, Akshay
On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Akshay, > > Flink currently does not support to automatically distribute hot keys > across different JVMs. What you can do is to adapt the parallelism/number > of partitions manually if you encounter that one partition contains a lot > of hot keys. This might mitigate the problem by partitioning the hot keys > into different partitions. > > Apart from that, the problem seems to be as Zhijiang indicated that your > join result is quite large. One record is 1 GB large. Try to decrease it or > give more memory to your TMs. > > Cheers, > Till > > On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <akshaymend...@gmail.com> > wrote: > >> Hi Zhijiang, >> Thanks for the quick reply. My concern is more towards >> how flink perform joins of two *skewed *datasets. Pig >> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark >> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the >> join of skewed datasets. The record size that you are mentioning about in >> your reply is after join operation takes place which is definitely going to >> be huge enough not to fit in jvm task manager task slot in my use case. We >> want to know if there is a way in flink to handle such skewed keys by >> distributing their values across different jvms. Let me know if you need >> more clarity on the issue. >> Thanks, >> Akshay >> >> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wangzhijiang...@aliyun.com> >> wrote: >> >>> Hi Akshay, >>> >>> You encountered an existing issue for serializing large records to cause >>> OOM. >>> >>> Every subpartition would create a separate serializer before, and each >>> serializer would maintain an internal bytes array for storing intermediate >>> serialization results. The key point is that these overhead internal bytes >>> array are not managed by framework, and their size would exceed with the >>> record size dynamically. If your job has many subpartitions with large >>> records, it may probably cause OOM issue. >>> >>> I already improved this issue to some extent by sharing only one >>> serializer for all subpartitions [1], that means we only have one bytes >>> array overhead at most. This issue is covered in release-1.7. >>> Currently the best option may reduce your record size if possible or you >>> can increase the heap size of task manager container. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-9913 >>> >>> Best, >>> Zhijiang >>> >>> ------------------------------------------------------------------ >>> 发件人:Akshay Mendole <akshaymend...@gmail.com> >>> 发送时间:2018年11月22日(星期四) 13:43 >>> 收件人:user <user@flink.apache.org> >>> 主 题:OutOfMemoryError while doing join operation in flink >>> >>> Hi, >>> We are converting one of our pig pipelines to flink using apache >>> beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, >>> enriches them, joins them and dumps back to hdfs. The data set R1 is >>> skewed. In a sense, it has few keys with lot of records. When we converted >>> the pig pipeline to apache beam and ran it using flink on a production yarn >>> cluster, we got the following error >>> >>> 2018-11-21 16:52:25,307 ERROR >>> org.apache.flink.runtime.operators.BatchTask - Error in >>> task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100) >>> java.lang.RuntimeException: Emitting the record caused an I/O exception: >>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds >>> JVM heap space >>> at >>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) >>> at >>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> at >>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140) >>> at >>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85) >>> at >>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111) >>> at >>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) >>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.IOException: Failed to serialize element. Serialized >>> size (> 1136656562 bytes) exceeds JVM heap space >>> at >>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323) >>> at >>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) >>> at java.io.DataOutputStream.write(DataOutputStream.java:107) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >>> at >>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) >>> at >>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) >>> at >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) >>> at >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) >>> at >>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) >>> at >>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >>> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) >>> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) >>> at >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) >>> at >>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >>> at >>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) >>> at >>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) >>> at >>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) >>> at >>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >>> ... 9 more >>> Caused by: java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) >>> at >>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) >>> at java.io.DataOutputStream.write(DataOutputStream.java:107) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >>> at >>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) >>> at >>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) >>> at >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) >>> at >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) >>> at >>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) >>> at >>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) >>> at >>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) >>> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) >>> at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480) >>> at >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83) >>> at >>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >>> at >>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) >>> at >>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) >>> at >>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) >>> at >>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >>> >>> >>> From the exception view in flink job manager dashboard, we could see >>> that this is happening at a join operation. >>> *When I say R1 dataset is skewed, there are some keys with number of >>> occurrences as high as 8,000,000 , while most of the keys occur just once.* >>> *Dataset R2 has records with keys occurring at most once.* >>> Also, if we exclude such keys which has high number of occurrences, the >>> pipeline runs absolutely fine which proves it is happening due these few >>> keys only. >>> >>> Hadoop version : 2.7.1 >>> Beam verision : 2.8.0 >>> Flink Runner version : 2.8.0 >>> >>> Let me know what more information should I fetch and post here in order >>> for you to help me resolve this. >>> >>> Thanks, >>> Akshay >>> >>> >>>