AHeise commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r517332664



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
##########
@@ -43,4 +44,14 @@ public int 
selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        public String toString() {
                return "FORWARD";
        }
+
+       @Override
+       public ChannelStateRescaler getUpstreamChannelStateRescaler() {
+               return ChannelStateRescaler.FIRST_CHANNEL;
+       }
+
+       @Override
+       public ChannelStateRescaler getDownstreamChannelStateRescaler() {
+               return ChannelStateRescaler.ROUND_ROBIN;

Review comment:
       You are right that things should match. However, for channel state there 
are two dimensions on how to scale: on subtask-level and on channel-level. If 
you think of a keyed exchange, a rescale on downstream means that the key 
ranges are changing. A rescale on upstream just means that we need to 
redistribute the channel state to an arbitrary other subtask.
   That's true for all exchanges except broadcast (where we need to discard 
extra state). I could also opt to just have a `boolean 
shouldRedistributeUpstreamState` instead of using `ChannelStateRescaler`. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to