AHeise commented on a change in pull request #13735: URL: https://github.com/apache/flink/pull/13735#discussion_r517332664
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java ########## @@ -43,4 +44,14 @@ public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { public String toString() { return "FORWARD"; } + + @Override + public ChannelStateRescaler getUpstreamChannelStateRescaler() { + return ChannelStateRescaler.FIRST_CHANNEL; + } + + @Override + public ChannelStateRescaler getDownstreamChannelStateRescaler() { + return ChannelStateRescaler.ROUND_ROBIN; Review comment: You are right that things should match. However, for channel state there are two dimensions on how to scale: on subtask-level and on channel-level. If you think of a keyed exchange, a rescale on downstream means that the key ranges are changing. A rescale on upstream just means that we need to redistribute the channel state to an arbitrary other subtask. That's true for all exchanges except broadcast (where we need to discard extra state). I could also opt to just have a `boolean shouldRedistributeUpstreamState` instead of using `ChannelStateRescaler`. WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org