Hello Aljoscha, unfortunately not, I'm not really familiar with the optimizer code and it's really complex to debug :(
this method is as far as I got - https://github.com/apache/flink/blob/master/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java#L301 D. On Mon, Apr 27, 2020 at 11:24 AM Aljoscha Krettek <aljos...@apache.org> wrote: > On 27.04.20 09:34, David Morávek wrote: > > > When we include `flatMap` in between rebalances -> > > `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, > > because dataset distribution may have changed (eg. you can possibli emit > > unbouded stream from a single element). Unfortunatelly `flatMap` output > is > > still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle > > gets ignored. > > This indeed seems incorrect. Did you look into the Flink code to see why > the output of the flatMap is `FORCED_REBALANCED`? > > Aljoscha >