Ok, I see your point. But I think there will be problems no matter what parallelism is chosen for the iteration source/sink. If the parallelism of the head is chosen then there will be an implicit rebalance from the operation right before the iteration to the iteration head. I think this should break ordering as well, in your case.
On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi, > > This is just a workaround, which actually breaks input order from my > source. I think the iteration construction should be reworked to set the > parallelism of the source/sink to the parallelism of the head operator (and > validate that all heads have the same parallelism). > > I thought this was the solution that you described with Stephan in some > older discussion before the rewrite. > > Cheers, > Gyula > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. okt. 6., > K, > 9:15): > > > Hi, > > I think what you would like to to can be achieved by: > > > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate() > > DataStream mapped = it.map(...) > > it.closeWith(mapped.partitionByHash(someField)) > > > > The input is rebalanced to the map inside the iteration as in your > example > > and the feedback should be partitioned by hash. > > > > Cheers, > > Aljoscha > > > > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > Hey, > > > > > > This question is mainly targeted towards Aljoscha but maybe someone can > > > help me out here: > > > > > > I think the way feedback partitioning is handled does not work, let me > > > illustrate with a simple example: > > > > > > IterativeStream it = ... (parallelism 1) > > > DataStream mapped = it.map(...) (parallelism 2) > > > // this does not work as the feedback has parallelism 2 != 1 > > > // it.closeWith(mapped.partitionByHash(someField)) > > > // so we need rebalance the data > > > > > > > > > it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField)) > > > > > > This program will execute but the feedback will not be partitioned by > > hash > > > to the mapper instances: > > > The partitioning will be set from the noOpMap to the iteration sink > which > > > has parallelism different from the mapper (1 vs 2) and then the > iteration > > > source forwards the element to the mapper (always to 0). > > > > > > So the problem is basically that the iteration source/sink pair gets > the > > > parallelism of the input stream (p=1) not the head operator (p = 2) > which > > > leads to incorrect partitioning. > > > > > > Did I miss something here? > > > > > > Cheers, > > > Gyula > > > > > > > > >