AHeise commented on a change in pull request #13735: URL: https://github.com/apache/flink/pull/13735#discussion_r517663178
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ########## @@ -327,4 +289,252 @@ public boolean hasState() { || inputChannelState.hasState() || resultSubpartitionState.hasState(); } + + public static Builder builder() { + return new Builder(); + } + + /** + * The builder for a new {@link OperatorSubtaskState} which can be obtained by {@link #builder()}. + */ + public static class Builder { + private StateObjectCollection<OperatorStateHandle> managedOperatorState = StateObjectCollection.empty(); + private StateObjectCollection<OperatorStateHandle> rawOperatorState = StateObjectCollection.empty(); + private StateObjectCollection<KeyedStateHandle> managedKeyedState = StateObjectCollection.empty(); + private StateObjectCollection<KeyedStateHandle> rawKeyedState = StateObjectCollection.empty(); + private StateObjectCollection<InputChannelStateHandle> inputChannelState = StateObjectCollection.empty(); + private StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState = StateObjectCollection.empty(); + private VirtualChannelMapping inputChannelMappings = VirtualChannelMapping.NO_MAPPING; + private VirtualChannelMapping outputChannelMappings = VirtualChannelMapping.NO_MAPPING; + + private Builder() { + } + + public Builder setManagedOperatorState(StateObjectCollection<OperatorStateHandle> managedOperatorState) { + this.managedOperatorState = checkNotNull(managedOperatorState); + return this; + } + + public Builder setManagedOperatorState(OperatorStateHandle managedOperatorState) { + return setManagedOperatorState(StateObjectCollection.singleton(checkNotNull(managedOperatorState))); + } + + public Builder setRawOperatorState(StateObjectCollection<OperatorStateHandle> rawOperatorState) { + this.rawOperatorState = checkNotNull(rawOperatorState); + return this; + } + + public Builder setRawOperatorState(OperatorStateHandle rawOperatorState) { + return setRawOperatorState(StateObjectCollection.singleton(checkNotNull(rawOperatorState))); + } + + public Builder setManagedKeyedState(StateObjectCollection<KeyedStateHandle> managedKeyedState) { + this.managedKeyedState = checkNotNull(managedKeyedState); + return this; + } + + public Builder setManagedKeyedState(KeyedStateHandle managedKeyedState) { + return setManagedKeyedState(StateObjectCollection.singleton(checkNotNull(managedKeyedState))); + } + + public Builder setRawKeyedState(StateObjectCollection<KeyedStateHandle> rawKeyedState) { + this.rawKeyedState = checkNotNull(rawKeyedState); + return this; + } + + public Builder setRawKeyedState(KeyedStateHandle rawKeyedState) { + return setRawKeyedState(StateObjectCollection.singleton(checkNotNull(rawKeyedState))); + } + + public Builder setInputChannelState(StateObjectCollection<InputChannelStateHandle> inputChannelState) { + this.inputChannelState = checkNotNull(inputChannelState); + return this; + } + + public Builder setResultSubpartitionState(StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState) { + this.resultSubpartitionState = checkNotNull(resultSubpartitionState); + return this; + } + + public Builder setInputChannelMappings(VirtualChannelMapping inputChannelMappings) { + this.inputChannelMappings = checkNotNull(inputChannelMappings); + return this; + } + + public Builder setOutputChannelMappings(VirtualChannelMapping outputChannelMappings) { + this.outputChannelMappings = checkNotNull(outputChannelMappings); + return this; + } + + public OperatorSubtaskState build() { + return new OperatorSubtaskState( + managedOperatorState, + rawOperatorState, + managedKeyedState, + rawKeyedState, + inputChannelState, + resultSubpartitionState, + inputChannelMappings, + outputChannelMappings); + } + } + + /** + * Captures ambiguous mappings of old channels to new channels. + * + * <p>For inputs, this mapping implies the following: + * <li> + * <ul>{@link #oldTaskInstances} is set when there is a rescale on this task potentially leading to different + * key groups. Upstream task has a corresponding {@link #partitionMappings} where it sends data over + * virtual channel while specifying the channel index in the VirtualChannelSelector. This subtask then + * demultiplexes over the virtual subtask index.</ul> + * <ul>{@link #partitionMappings} is set when there is a downscale of the upstream task. Upstream task has + * a corresponding {@link #oldTaskInstances} where it sends data over virtual channel while specifying the + * subtask index in the VirtualChannelSelector. This subtask then demultiplexes over channel indexes.</ul> + * </li> + * + * <p>For outputs, it's vice-versa. The information must be kept in sync but they are used in opposite ways for + * multiplexing/demultiplexing. + * + * <p>Note that in the common rescaling case both information is set and need to be simultaneously used. If the + * input subtask subsumes the state of 3 old subtasks and a channel corresponds to 2 old channels, then there are + * 6 virtual channels to be demultiplexed. + */ + public static class VirtualChannelMapping implements Serializable { Review comment: I went with `InflightDataRescalingDescriptor` for now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org