Ok, I see your point. But I think there will be problems no matter what
parallelism is chosen for the iteration source/sink. If the parallelism of
the head is chosen then there will be an implicit rebalance from the
operation right before the iteration to the iteration head. I think this
should break ordering as well, in your case.

On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi,
>
> This is just a workaround, which actually breaks input order from my
> source. I think the iteration construction should be reworked to set the
> parallelism of the source/sink to the parallelism of the head operator (and
> validate that all heads have the same parallelism).
>
> I thought this was the solution that you described with Stephan in some
> older discussion before the rewrite.
>
> Cheers,
> Gyula
>
> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. okt. 6.,
> K,
> 9:15):
>
> > Hi,
> > I think what you would like to to can be achieved by:
> >
> > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > DataStream mapped = it.map(...)
> >  it.closeWith(mapped.partitionByHash(someField))
> >
> > The input is rebalanced to the map inside the iteration as in your
> example
> > and the feedback should be partitioned by hash.
> >
> > Cheers,
> > Aljoscha
> >
> >
> > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > > Hey,
> > >
> > > This question is mainly targeted towards Aljoscha but maybe someone can
> > > help me out here:
> > >
> > > I think the way feedback partitioning is handled does not work, let me
> > > illustrate with a simple example:
> > >
> > > IterativeStream it = ... (parallelism 1)
> > > DataStream mapped = it.map(...) (parallelism 2)
> > > // this does not work as the feedback has parallelism 2 != 1
> > > // it.closeWith(mapped.partitionByHash(someField))
> > > // so we need rebalance the data
> > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > >
> > > This program will execute but the feedback will not be partitioned by
> > hash
> > > to the mapper instances:
> > > The partitioning will be set from the noOpMap to the iteration sink
> which
> > > has parallelism different from the mapper (1 vs 2) and then the
> iteration
> > > source forwards the element to the mapper (always to 0).
> > >
> > > So the problem is basically that the iteration source/sink pair gets
> the
> > > parallelism of the input stream (p=1) not the head operator (p = 2)
> which
> > > leads to incorrect partitioning.
> > >
> > > Did I miss something here?
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> >
>

Reply via email to