The spilling will only happen when joining the branched data sets.
If you keep them separate and eventually emit them, no intermediate data
will be spilled.

2018-05-04 18:05 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Does this duplication happen when I write directly to disk after the
> flatMaps?
>
> On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> That will happen if you join (or coGroup) the branched DataSets, i.e.,
>> you have branching and merging pattern in your stream.
>>
>> The problem in that case is that one of the inputs is pipelined (e.g.,
>> the probe side of a hash join) and the other one is blocking.
>> In order to execute such a plan, we must spill the pipelined data set to
>> disk to ensure that the other input can be fully consumed (to build the
>> hash table).
>>
>> There's not really a solution to this.
>> You could change the join strategy to sort-merge-join but this will sort
>> both inputs and also result in spilling both to disk.
>>
>> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks for the detailed reply.
>>> The problem I see is that the source dataset is huge and, since it
>>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>>> disk usage during the job and it was corresponding exactly to the size
>>> estimated by the Flink UI, that is twice it's initial size).
>>> Probably there are no problem until you keep data in memory but in my
>>> case it's very problematic this memory explosion :(
>>>
>>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> No, there's no way around it.
>>>> DataSets that are processed by more than one operator cannot be
>>>> processed by chained operators.
>>>> The records need to be copied to avoid concurrent modifications.
>>>> However, the data should not be shipped over the network if all operators
>>>> have the same parallelism.
>>>> Instead records are serialized and handed over via local byte[]
>>>> in-memory channels.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>>>
>>>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj201...@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> Which version of Flink are you using?
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Amit
>>>>>>
>>>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>> > Hi all,
>>>>>> > I've a Flink batch job that reads a parquet dataset and then
>>>>>> applies 2
>>>>>> > flatMap to it (see pseudocode below).
>>>>>> > The problem is that this dataset is quite big and Flink duplicates
>>>>>> it before
>>>>>> > sending the data to these 2 operators (I've guessed this from the
>>>>>> doubling
>>>>>> > amount of sent bytes) .
>>>>>> > Is there a way to avoid this behaviour?
>>>>>> >
>>>>>> > -------------------------------------------------------
>>>>>> > Here's the pseudo code of my job:
>>>>>> >
>>>>>> > DataSet X = readParquetDir();
>>>>>> > X1 = X.flatMap(...);
>>>>>> > X2 = X.flatMap(...);
>>>>>> >
>>>>>> > Best,
>>>>>> > Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Reply via email to