Does this duplication happen when I write directly to disk after the
flatMaps?

On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> That will happen if you join (or coGroup) the branched DataSets, i.e., you
> have branching and merging pattern in your stream.
>
> The problem in that case is that one of the inputs is pipelined (e.g., the
> probe side of a hash join) and the other one is blocking.
> In order to execute such a plan, we must spill the pipelined data set to
> disk to ensure that the other input can be fully consumed (to build the
> hash table).
>
> There's not really a solution to this.
> You could change the join strategy to sort-merge-join but this will sort
> both inputs and also result in spilling both to disk.
>
> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> Hi Fabian,
>> thanks for the detailed reply.
>> The problem I see is that the source dataset is huge and, since it
>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>> disk usage during the job and it was corresponding exactly to the size
>> estimated by the Flink UI, that is twice it's initial size).
>> Probably there are no problem until you keep data in memory but in my
>> case it's very problematic this memory explosion :(
>>
>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> No, there's no way around it.
>>> DataSets that are processed by more than one operator cannot be
>>> processed by chained operators.
>>> The records need to be copied to avoid concurrent modifications.
>>> However, the data should not be shipped over the network if all operators
>>> have the same parallelism.
>>> Instead records are serialized and handed over via local byte[]
>>> in-memory channels.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>>
>>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj201...@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Which version of Flink are you using?
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Amit
>>>>>
>>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>> > Hi all,
>>>>> > I've a Flink batch job that reads a parquet dataset and then applies
>>>>> 2
>>>>> > flatMap to it (see pseudocode below).
>>>>> > The problem is that this dataset is quite big and Flink duplicates
>>>>> it before
>>>>> > sending the data to these 2 operators (I've guessed this from the
>>>>> doubling
>>>>> > amount of sent bytes) .
>>>>> > Is there a way to avoid this behaviour?
>>>>> >
>>>>> > -------------------------------------------------------
>>>>> > Here's the pseudo code of my job:
>>>>> >
>>>>> > DataSet X = readParquetDir();
>>>>> > X1 = X.flatMap(...);
>>>>> > X2 = X.flatMap(...);
>>>>> >
>>>>> > Best,
>>>>> > Flavio
>>>>>
>>>>
>>>>
>>>
>>

Reply via email to