Thanks for the feedback, Fabian. This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.
The assumptions are as follows: - Each iteration runs a job with the same structure and the same DOP; - Updates a realized through a coGroup with a fixed DOP (let's say *N*), which consumes a *(state, updates)* pair of datasets and produces a new version of the state (let's call it *state'*); - We keep track where the *N* output partitions of *state'* are located and use this information for local placement of the corresponding *N* DataSource tasks in the next iteration (via FLINK-1478); - The remaining piece of the puzzle is to figure out how to tell the coGroup that one of the inputs is already partitioned so id avoids an unnecessary shuffle; If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday. In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal. Suggestions are welcome! Regards, Alex 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Mustafa, > > I'm afraid, this is not possible. > Although you can annotate DataSources with partitioning information, this > is not enough to avoid repartitioning for a CoGroup. The reason for that is > that CoGroup requires co-partitioning of both inputs, i.e., both inputs > must be equally partitioned (same number of partitions, same partitioning > function, same location of partitions). Since Flink is dynamically > assigning tasks to execution slots, it is not possible to co-locate data > that was read from a data source and data coming from the result of another > computation. > > If you just need the result of the first co-group on disk, you could also > build a single program that does both co-groups and additional writes the > result of the first co-group to disk (Flink supports multiple data sinks). > > Best, Fabian > > 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <elbeherymust...@gmail.com>: > >> Hi, >> >> I am writing a flink job, in which I have three datasets. I have >> partitionedByHash the first two before coGrouping them. >> >> My plan is to spill the result of coGrouping to disk, and then re-read it >> again before coGrouping with the third dataset. >> >> My question is, is there anyway to inform flink that the first coGroup >> result is already partitioned ?! I know I can re-partition again before >> coGrouping but I would like to know if there is anyway to avoid a step >> which was already executed, >> >> Regards. >> >> -- >> Mustafa Elbehery >> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> >> +49(0)15750363097 >> skype: mustafaelbehery87 >> >> >