Hi, I think what you would like to to can be achieved by: IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate() DataStream mapped = it.map(...) it.closeWith(mapped.partitionByHash(someField))
The input is rebalanced to the map inside the iteration as in your example and the feedback should be partitioned by hash. Cheers, Aljoscha On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.f...@gmail.com> wrote: > Hey, > > This question is mainly targeted towards Aljoscha but maybe someone can > help me out here: > > I think the way feedback partitioning is handled does not work, let me > illustrate with a simple example: > > IterativeStream it = ... (parallelism 1) > DataStream mapped = it.map(...) (parallelism 2) > // this does not work as the feedback has parallelism 2 != 1 > // it.closeWith(mapped.partitionByHash(someField)) > // so we need rebalance the data > > it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField)) > > This program will execute but the feedback will not be partitioned by hash > to the mapper instances: > The partitioning will be set from the noOpMap to the iteration sink which > has parallelism different from the mapper (1 vs 2) and then the iteration > source forwards the element to the mapper (always to 0). > > So the problem is basically that the iteration source/sink pair gets the > parallelism of the input stream (p=1) not the head operator (p = 2) which > leads to incorrect partitioning. > > Did I miss something here? > > Cheers, > Gyula > >