The spilling will only happen when joining the branched data sets. If you keep them separate and eventually emit them, no intermediate data will be spilled.
2018-05-04 18:05 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Does this duplication happen when I write directly to disk after the > flatMaps? > > On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> That will happen if you join (or coGroup) the branched DataSets, i.e., >> you have branching and merging pattern in your stream. >> >> The problem in that case is that one of the inputs is pipelined (e.g., >> the probe side of a hash join) and the other one is blocking. >> In order to execute such a plan, we must spill the pipelined data set to >> disk to ensure that the other input can be fully consumed (to build the >> hash table). >> >> There's not really a solution to this. >> You could change the join strategy to sort-merge-join but this will sort >> both inputs and also result in spilling both to disk. >> >> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi Fabian, >>> thanks for the detailed reply. >>> The problem I see is that the source dataset is huge and, since it >>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing >>> disk usage during the job and it was corresponding exactly to the size >>> estimated by the Flink UI, that is twice it's initial size). >>> Probably there are no problem until you keep data in memory but in my >>> case it's very problematic this memory explosion :( >>> >>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Flavio, >>>> >>>> No, there's no way around it. >>>> DataSets that are processed by more than one operator cannot be >>>> processed by chained operators. >>>> The records need to be copied to avoid concurrent modifications. >>>> However, the data should not be shipped over the network if all operators >>>> have the same parallelism. >>>> Instead records are serialized and handed over via local byte[] >>>> in-memory channels. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..) >>>>> >>>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj201...@gmail.com> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> Which version of Flink are you using? >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Amit >>>>>> >>>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> > Hi all, >>>>>> > I've a Flink batch job that reads a parquet dataset and then >>>>>> applies 2 >>>>>> > flatMap to it (see pseudocode below). >>>>>> > The problem is that this dataset is quite big and Flink duplicates >>>>>> it before >>>>>> > sending the data to these 2 operators (I've guessed this from the >>>>>> doubling >>>>>> > amount of sent bytes) . >>>>>> > Is there a way to avoid this behaviour? >>>>>> > >>>>>> > ------------------------------------------------------- >>>>>> > Here's the pseudo code of my job: >>>>>> > >>>>>> > DataSet X = readParquetDir(); >>>>>> > X1 = X.flatMap(...); >>>>>> > X2 = X.flatMap(...); >>>>>> > >>>>>> > Best, >>>>>> > Flavio >>>>>> >>>>> >>>>> >>>> >>> >