In principle, a data set the branches needs only to be materialized if both branches are pipelined until they are merged (i.e., in a hybrid-hash join). Otherwise, the data flow might deadlock due to pipelining.
If you group both data sets before they are joined, the pipeline is broken due to the blocking sort. Therefore, the branching data set should not be materialized. 2015-10-22 12:18 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Thanks for your responses! > > The derived datasets would indeed be grouped after the filter operations. > Why would this cause them to be materialized to disk? And if I understand > correctly the the data source will not chain to more than one filter, > causing (de)serialization to transfer the records from the data source to > the 2 or more filters on the same worker. > > I would guess that in the end this approach would still be faster than > reading the entire input multiple times (we are talking 100GB+ on max 32 > workers) but I would have to run some experiments to confirm that. > > > > 2015-10-22 12:06 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> It might even be materialized (to disk) if both derived data sets are >> joined. >> >> 2015-10-22 12:01 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >> >>> I fear that the filter operations are not chained because there are at >>> least two of them which have the same DataSet as input. However, it's true >>> that the intermediate results are not materialized. >>> >>> It is also correct that the filter operators are deployed colocated to >>> the data sources. Thus, there is no network traffic. However, the data will >>> still be serialized/deserialized between the not-chained operators (also if >>> they reside on the same machine). >>> >>> >>> >>> On Thu, Oct 22, 2015 at 11:49 AM, Gábor Gévay <gga...@gmail.com> wrote: >>> >>>> Hello! >>>> >>>> > I have thought about a workaround where the InputFormat would return >>>> > Tuple2s and the first field is the name of the dataset to which a >>>> record >>>> > belongs. This would however require me to filter the read data once >>>> for >>>> > each dataset or to do a groupReduce which is some overhead i'm >>>> > looking to prevent. >>>> >>>> I think that those two filters might not have that much overhead, >>>> because of several optimizations Flink does under the hood: >>>> - The dataset of Tuple2s won't be materialized, but instead will be >>>> streamed directly to the two filter operators. >>>> - The input format and the two filters will probably end up on the >>>> same machine, because of chaining, so there won't be >>>> serialization/deserialization between them. >>>> >>>> Best, >>>> Gabor >>>> >>>> >>>> >>>> 2015-10-22 11:38 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >>>> > Good morning! >>>> > >>>> > I have the following usecase: >>>> > >>>> > My program reads nested data (in this specific case XML) based on >>>> > projections (path expressions) of this data. Often multiple paths are >>>> > projected onto the same input. I would like each path to result in >>>> its own >>>> > dataset. >>>> > >>>> > Is it possible to generate more than 1 dataset using a readFile >>>> operation to >>>> > prevent reading the input twice? >>>> > >>>> > I have thought about a workaround where the InputFormat would return >>>> Tuple2s >>>> > and the first field is the name of the dataset to which a record >>>> belongs. >>>> > This would however require me to filter the read data once for each >>>> dataset >>>> > or to do a groupReduce which is some overhead i'm looking to prevent. >>>> > >>>> > Is there a better (less overhead) workaround for doing this? Or is >>>> there >>>> > some mechanism in Flink that would allow me to do this? >>>> > >>>> > Cheers! >>>> > >>>> > - Pieter >>>> >>> >>> >> >