AHeise commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r517348994



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -327,4 +289,252 @@ public boolean hasState() {
                        || inputChannelState.hasState()
                        || resultSubpartitionState.hasState();
        }
+
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       /**
+        * The builder for a new {@link OperatorSubtaskState} which can be 
obtained by {@link #builder()}.
+        */
+       public static class Builder {
+               private StateObjectCollection<OperatorStateHandle> 
managedOperatorState = StateObjectCollection.empty();
+               private StateObjectCollection<OperatorStateHandle> 
rawOperatorState = StateObjectCollection.empty();
+               private StateObjectCollection<KeyedStateHandle> 
managedKeyedState = StateObjectCollection.empty();
+               private StateObjectCollection<KeyedStateHandle> rawKeyedState = 
StateObjectCollection.empty();
+               private StateObjectCollection<InputChannelStateHandle> 
inputChannelState = StateObjectCollection.empty();
+               private StateObjectCollection<ResultSubpartitionStateHandle> 
resultSubpartitionState = StateObjectCollection.empty();
+               private VirtualChannelMapping inputChannelMappings = 
VirtualChannelMapping.NO_MAPPING;
+               private VirtualChannelMapping outputChannelMappings = 
VirtualChannelMapping.NO_MAPPING;
+
+               private Builder() {
+               }
+
+               public Builder 
setManagedOperatorState(StateObjectCollection<OperatorStateHandle> 
managedOperatorState) {
+                       this.managedOperatorState = 
checkNotNull(managedOperatorState);
+                       return this;
+               }
+
+               public Builder setManagedOperatorState(OperatorStateHandle 
managedOperatorState) {
+                       return 
setManagedOperatorState(StateObjectCollection.singleton(checkNotNull(managedOperatorState)));
+               }
+
+               public Builder 
setRawOperatorState(StateObjectCollection<OperatorStateHandle> 
rawOperatorState) {
+                       this.rawOperatorState = checkNotNull(rawOperatorState);
+                       return this;
+               }
+
+               public Builder setRawOperatorState(OperatorStateHandle 
rawOperatorState) {
+                       return 
setRawOperatorState(StateObjectCollection.singleton(checkNotNull(rawOperatorState)));
+               }
+
+               public Builder 
setManagedKeyedState(StateObjectCollection<KeyedStateHandle> managedKeyedState) 
{
+                       this.managedKeyedState = 
checkNotNull(managedKeyedState);
+                       return this;
+               }
+
+               public Builder setManagedKeyedState(KeyedStateHandle 
managedKeyedState) {
+                       return 
setManagedKeyedState(StateObjectCollection.singleton(checkNotNull(managedKeyedState)));
+               }
+
+               public Builder 
setRawKeyedState(StateObjectCollection<KeyedStateHandle> rawKeyedState) {
+                       this.rawKeyedState = checkNotNull(rawKeyedState);
+                       return this;
+               }
+
+               public Builder setRawKeyedState(KeyedStateHandle rawKeyedState) 
{
+                       return 
setRawKeyedState(StateObjectCollection.singleton(checkNotNull(rawKeyedState)));
+               }
+
+               public Builder 
setInputChannelState(StateObjectCollection<InputChannelStateHandle> 
inputChannelState) {
+                       this.inputChannelState = 
checkNotNull(inputChannelState);
+                       return this;
+               }
+
+               public Builder 
setResultSubpartitionState(StateObjectCollection<ResultSubpartitionStateHandle> 
resultSubpartitionState) {
+                       this.resultSubpartitionState = 
checkNotNull(resultSubpartitionState);
+                       return this;
+               }
+
+               public Builder setInputChannelMappings(VirtualChannelMapping 
inputChannelMappings) {
+                       this.inputChannelMappings = 
checkNotNull(inputChannelMappings);
+                       return this;
+               }
+
+               public Builder setOutputChannelMappings(VirtualChannelMapping 
outputChannelMappings) {
+                       this.outputChannelMappings = 
checkNotNull(outputChannelMappings);
+                       return this;
+               }
+
+               public OperatorSubtaskState build() {
+                       return new OperatorSubtaskState(
+                               managedOperatorState,
+                               rawOperatorState,
+                               managedKeyedState,
+                               rawKeyedState,
+                               inputChannelState,
+                               resultSubpartitionState,
+                               inputChannelMappings,
+                               outputChannelMappings);
+               }
+       }
+
+       /**
+        * Captures ambiguous mappings of old channels to new channels.
+        *
+        * <p>For inputs, this mapping implies the following:
+        * <li>
+        *     <ul>{@link #oldTaskInstances} is set when there is a rescale on 
this task potentially leading to different
+        *     key groups. Upstream task has a corresponding {@link 
#partitionMappings} where it sends data over
+        *     virtual channel while specifying the channel index in the 
VirtualChannelSelector. This subtask then
+        *     demultiplexes over the virtual subtask index.</ul>
+        *     <ul>{@link #partitionMappings} is set when there is a downscale 
of the upstream task. Upstream task has
+        *     a corresponding {@link #oldTaskInstances} where it sends data 
over virtual channel while specifying the
+        *     subtask index in the VirtualChannelSelector. This subtask then 
demultiplexes over channel indexes.</ul>
+        * </li>
+        *
+        * <p>For outputs, it's vice-versa. The information must be kept in 
sync but they are used in opposite ways for
+        * multiplexing/demultiplexing.
+        *
+        * <p>Note that in the common rescaling case both information is set 
and need to be simultaneously used. If the
+        * input subtask subsumes the state of 3 old subtasks and a channel 
corresponds to 2 old channels, then there are
+        * 6 virtual channels to be demultiplexed.
+        */
+       public static class VirtualChannelMapping implements Serializable {
+               public static final PartitionMapping NO_CHANNEL_MAPPING = new 
PartitionMapping(emptyList());
+               public static final List<PartitionMapping> NO_PARTITIONS = 
emptyList();
+               public static final BitSet NO_SUBTASKS = new BitSet();
+               public static final VirtualChannelMapping NO_MAPPING = new 
VirtualChannelMapping(NO_SUBTASKS, NO_PARTITIONS);
+
+               /**
+                * Set when several operator instances are merged into one.
+                */
+               private final BitSet oldTaskInstances;
+
+               /**
+                * Set when channels are merged because the connected operator 
has been rescaled.
+                */
+               private final List<PartitionMapping> partitionMappings;
+
+               public VirtualChannelMapping(BitSet oldTaskInstances, 
List<PartitionMapping> partitionMappings) {
+                       this.oldTaskInstances = oldTaskInstances;
+                       this.partitionMappings = partitionMappings;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       final VirtualChannelMapping that = 
(VirtualChannelMapping) o;
+                       return oldTaskInstances.equals(that.oldTaskInstances) &&
+                               
partitionMappings.equals(that.partitionMappings);
+               }
+
+               public int[] getOldTaskInstances(int defaultSubtask) {
+                       return oldTaskInstances.equals(NO_SUBTASKS) ?
+                               new int[] {defaultSubtask} :
+                               oldTaskInstances.stream().toArray();
+               }
+
+               public PartitionMapping getPartitionMapping(int partitionIndex) 
{
+                       if (partitionMappings.isEmpty()) {
+                               return NO_CHANNEL_MAPPING;
+                       }
+                       return partitionMappings.get(partitionIndex);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(oldTaskInstances, 
partitionMappings);
+               }
+
+               @Override
+               public String toString() {
+                       return "VirtualChannelMapping{" +
+                               "oldTaskInstances=" + oldTaskInstances +
+                               ", partitionMappings=" + partitionMappings +
+                               '}';
+               }
+       }
+
+       /**
+        * Contains the fine-grain channel mappings that occur when a connected 
operator has been rescaled.
+        */
+       public static class PartitionMapping implements Serializable {
+
+               /**
+                * For each new channel (=index), all old channels are set.
+                */
+               private final List<BitSet> newToOldChannelIndexes;

Review comment:
       If we have consecutive indexes from 0..n, what's wrong with 
`List<Set<Integer>>`? (I got the BitSet part)
   btw it would be `Map<Int, Set<Int>>`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to