AHeise commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r517663178



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -327,4 +289,252 @@ public boolean hasState() {
                        || inputChannelState.hasState()
                        || resultSubpartitionState.hasState();
        }
+
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       /**
+        * The builder for a new {@link OperatorSubtaskState} which can be 
obtained by {@link #builder()}.
+        */
+       public static class Builder {
+               private StateObjectCollection<OperatorStateHandle> 
managedOperatorState = StateObjectCollection.empty();
+               private StateObjectCollection<OperatorStateHandle> 
rawOperatorState = StateObjectCollection.empty();
+               private StateObjectCollection<KeyedStateHandle> 
managedKeyedState = StateObjectCollection.empty();
+               private StateObjectCollection<KeyedStateHandle> rawKeyedState = 
StateObjectCollection.empty();
+               private StateObjectCollection<InputChannelStateHandle> 
inputChannelState = StateObjectCollection.empty();
+               private StateObjectCollection<ResultSubpartitionStateHandle> 
resultSubpartitionState = StateObjectCollection.empty();
+               private VirtualChannelMapping inputChannelMappings = 
VirtualChannelMapping.NO_MAPPING;
+               private VirtualChannelMapping outputChannelMappings = 
VirtualChannelMapping.NO_MAPPING;
+
+               private Builder() {
+               }
+
+               public Builder 
setManagedOperatorState(StateObjectCollection<OperatorStateHandle> 
managedOperatorState) {
+                       this.managedOperatorState = 
checkNotNull(managedOperatorState);
+                       return this;
+               }
+
+               public Builder setManagedOperatorState(OperatorStateHandle 
managedOperatorState) {
+                       return 
setManagedOperatorState(StateObjectCollection.singleton(checkNotNull(managedOperatorState)));
+               }
+
+               public Builder 
setRawOperatorState(StateObjectCollection<OperatorStateHandle> 
rawOperatorState) {
+                       this.rawOperatorState = checkNotNull(rawOperatorState);
+                       return this;
+               }
+
+               public Builder setRawOperatorState(OperatorStateHandle 
rawOperatorState) {
+                       return 
setRawOperatorState(StateObjectCollection.singleton(checkNotNull(rawOperatorState)));
+               }
+
+               public Builder 
setManagedKeyedState(StateObjectCollection<KeyedStateHandle> managedKeyedState) 
{
+                       this.managedKeyedState = 
checkNotNull(managedKeyedState);
+                       return this;
+               }
+
+               public Builder setManagedKeyedState(KeyedStateHandle 
managedKeyedState) {
+                       return 
setManagedKeyedState(StateObjectCollection.singleton(checkNotNull(managedKeyedState)));
+               }
+
+               public Builder 
setRawKeyedState(StateObjectCollection<KeyedStateHandle> rawKeyedState) {
+                       this.rawKeyedState = checkNotNull(rawKeyedState);
+                       return this;
+               }
+
+               public Builder setRawKeyedState(KeyedStateHandle rawKeyedState) 
{
+                       return 
setRawKeyedState(StateObjectCollection.singleton(checkNotNull(rawKeyedState)));
+               }
+
+               public Builder 
setInputChannelState(StateObjectCollection<InputChannelStateHandle> 
inputChannelState) {
+                       this.inputChannelState = 
checkNotNull(inputChannelState);
+                       return this;
+               }
+
+               public Builder 
setResultSubpartitionState(StateObjectCollection<ResultSubpartitionStateHandle> 
resultSubpartitionState) {
+                       this.resultSubpartitionState = 
checkNotNull(resultSubpartitionState);
+                       return this;
+               }
+
+               public Builder setInputChannelMappings(VirtualChannelMapping 
inputChannelMappings) {
+                       this.inputChannelMappings = 
checkNotNull(inputChannelMappings);
+                       return this;
+               }
+
+               public Builder setOutputChannelMappings(VirtualChannelMapping 
outputChannelMappings) {
+                       this.outputChannelMappings = 
checkNotNull(outputChannelMappings);
+                       return this;
+               }
+
+               public OperatorSubtaskState build() {
+                       return new OperatorSubtaskState(
+                               managedOperatorState,
+                               rawOperatorState,
+                               managedKeyedState,
+                               rawKeyedState,
+                               inputChannelState,
+                               resultSubpartitionState,
+                               inputChannelMappings,
+                               outputChannelMappings);
+               }
+       }
+
+       /**
+        * Captures ambiguous mappings of old channels to new channels.
+        *
+        * <p>For inputs, this mapping implies the following:
+        * <li>
+        *     <ul>{@link #oldTaskInstances} is set when there is a rescale on 
this task potentially leading to different
+        *     key groups. Upstream task has a corresponding {@link 
#partitionMappings} where it sends data over
+        *     virtual channel while specifying the channel index in the 
VirtualChannelSelector. This subtask then
+        *     demultiplexes over the virtual subtask index.</ul>
+        *     <ul>{@link #partitionMappings} is set when there is a downscale 
of the upstream task. Upstream task has
+        *     a corresponding {@link #oldTaskInstances} where it sends data 
over virtual channel while specifying the
+        *     subtask index in the VirtualChannelSelector. This subtask then 
demultiplexes over channel indexes.</ul>
+        * </li>
+        *
+        * <p>For outputs, it's vice-versa. The information must be kept in 
sync but they are used in opposite ways for
+        * multiplexing/demultiplexing.
+        *
+        * <p>Note that in the common rescaling case both information is set 
and need to be simultaneously used. If the
+        * input subtask subsumes the state of 3 old subtasks and a channel 
corresponds to 2 old channels, then there are
+        * 6 virtual channels to be demultiplexed.
+        */
+       public static class VirtualChannelMapping implements Serializable {

Review comment:
       I went with `InflightDataRescalingDescriptor` for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to