Hi, This is just a workaround, which actually breaks input order from my source. I think the iteration construction should be reworked to set the parallelism of the source/sink to the parallelism of the head operator (and validate that all heads have the same parallelism).
I thought this was the solution that you described with Stephan in some older discussion before the rewrite. Cheers, Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. okt. 6., K, 9:15): > Hi, > I think what you would like to to can be achieved by: > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate() > DataStream mapped = it.map(...) > it.closeWith(mapped.partitionByHash(someField)) > > The input is rebalanced to the map inside the iteration as in your example > and the feedback should be partitioned by hash. > > Cheers, > Aljoscha > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hey, > > > > This question is mainly targeted towards Aljoscha but maybe someone can > > help me out here: > > > > I think the way feedback partitioning is handled does not work, let me > > illustrate with a simple example: > > > > IterativeStream it = ... (parallelism 1) > > DataStream mapped = it.map(...) (parallelism 2) > > // this does not work as the feedback has parallelism 2 != 1 > > // it.closeWith(mapped.partitionByHash(someField)) > > // so we need rebalance the data > > > > > it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField)) > > > > This program will execute but the feedback will not be partitioned by > hash > > to the mapper instances: > > The partitioning will be set from the noOpMap to the iteration sink which > > has parallelism different from the mapper (1 vs 2) and then the iteration > > source forwards the element to the mapper (always to 0). > > > > So the problem is basically that the iteration source/sink pair gets the > > parallelism of the input stream (p=1) not the head operator (p = 2) which > > leads to incorrect partitioning. > > > > Did I miss something here? > > > > Cheers, > > Gyula > > > > >