Hi,
I think what you would like to to can be achieved by:

IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
DataStream mapped = it.map(...)
 it.closeWith(mapped.partitionByHash(someField))

The input is rebalanced to the map inside the iteration as in your example
and the feedback should be partitioned by hash.

Cheers,
Aljoscha


On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hey,
>
> This question is mainly targeted towards Aljoscha but maybe someone can
> help me out here:
>
> I think the way feedback partitioning is handled does not work, let me
> illustrate with a simple example:
>
> IterativeStream it = ... (parallelism 1)
> DataStream mapped = it.map(...) (parallelism 2)
> // this does not work as the feedback has parallelism 2 != 1
> // it.closeWith(mapped.partitionByHash(someField))
> // so we need rebalance the data
>
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
>
> This program will execute but the feedback will not be partitioned by hash
> to the mapper instances:
> The partitioning will be set from the noOpMap to the iteration sink which
> has parallelism different from the mapper (1 vs 2) and then the iteration
> source forwards the element to the mapper (always to 0).
>
> So the problem is basically that the iteration source/sink pair gets the
> parallelism of the input stream (p=1) not the head operator (p = 2) which
> leads to incorrect partitioning.
>
> Did I miss something here?
>
> Cheers,
> Gyula
>
>

Reply via email to