Does this duplication happen when I write directly to disk after the flatMaps?
On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fhue...@gmail.com> wrote: > That will happen if you join (or coGroup) the branched DataSets, i.e., you > have branching and merging pattern in your stream. > > The problem in that case is that one of the inputs is pipelined (e.g., the > probe side of a hash join) and the other one is blocking. > In order to execute such a plan, we must spill the pipelined data set to > disk to ensure that the other input can be fully consumed (to build the > hash table). > > There's not really a solution to this. > You could change the join strategy to sort-merge-join but this will sort > both inputs and also result in spilling both to disk. > > 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Hi Fabian, >> thanks for the detailed reply. >> The problem I see is that the source dataset is huge and, since it >> doesn't fit in memory, it's spilled twice to disk (I checked the increasing >> disk usage during the job and it was corresponding exactly to the size >> estimated by the Flink UI, that is twice it's initial size). >> Probably there are no problem until you keep data in memory but in my >> case it's very problematic this memory explosion :( >> >> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> No, there's no way around it. >>> DataSets that are processed by more than one operator cannot be >>> processed by chained operators. >>> The records need to be copied to avoid concurrent modifications. >>> However, the data should not be shipped over the network if all operators >>> have the same parallelism. >>> Instead records are serialized and handed over via local byte[] >>> in-memory channels. >>> >>> Best, Fabian >>> >>> >>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..) >>>> >>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj201...@gmail.com> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> Which version of Flink are you using? >>>>> >>>>> -- >>>>> Thanks, >>>>> Amit >>>>> >>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> > Hi all, >>>>> > I've a Flink batch job that reads a parquet dataset and then applies >>>>> 2 >>>>> > flatMap to it (see pseudocode below). >>>>> > The problem is that this dataset is quite big and Flink duplicates >>>>> it before >>>>> > sending the data to these 2 operators (I've guessed this from the >>>>> doubling >>>>> > amount of sent bytes) . >>>>> > Is there a way to avoid this behaviour? >>>>> > >>>>> > ------------------------------------------------------- >>>>> > Here's the pseudo code of my job: >>>>> > >>>>> > DataSet X = readParquetDir(); >>>>> > X1 = X.flatMap(...); >>>>> > X2 = X.flatMap(...); >>>>> > >>>>> > Best, >>>>> > Flavio >>>>> >>>> >>>> >>> >>