akalash commented on a change in pull request #16019: URL: https://github.com/apache/flink/pull/16019#discussion_r642929402
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapper.java ########## @@ -54,18 +54,6 @@ } }, - /** - * Discards extra state. Useful if all subtasks already contain the same information - * (broadcast). - */ - DISCARD_EXTRA_STATE { Review comment: I personally didn't find any problem with this removing but I just want to emphasize that if this enum is using somewhere in WebUI(or similar) it can not be so easily removed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java ########## @@ -101,50 +66,150 @@ public boolean equals(Object o) { return false; } InflightDataRescalingDescriptor that = (InflightDataRescalingDescriptor) o; - return Arrays.equals(oldSubtaskIndexes, that.oldSubtaskIndexes) - && Arrays.equals(rescaledChannelsMappings, that.rescaledChannelsMappings) - && Objects.equals(ambiguousSubtaskIndexes, that.ambiguousSubtaskIndexes); + return Arrays.equals(gateOrPartitionDescriptors, that.gateOrPartitionDescriptors); } @Override public int hashCode() { - int result = Objects.hash(ambiguousSubtaskIndexes); - result = 31 * result + Arrays.hashCode(oldSubtaskIndexes); - result = 31 * result + Arrays.hashCode(rescaledChannelsMappings); - return result; + return Arrays.hashCode(gateOrPartitionDescriptors); } @Override public String toString() { return "InflightDataRescalingDescriptor{" - + "oldSubtaskIndexes=" - + Arrays.toString(oldSubtaskIndexes) - + ", rescaledChannelsMappings=" - + Arrays.toString(rescaledChannelsMappings) - + ", ambiguousSubtaskIndexes=" - + ambiguousSubtaskIndexes + + "gateOrPartitionDescriptors=" + + Arrays.toString(gateOrPartitionDescriptors) + '}'; } + /** + * Captures ambiguous mappings of old channels to new channels. + * + * <p>For inputs, this mapping implies the following: + * <li> + * + * <ul> + * {@link #oldSubtaskIndexes} is set when there is a rescale on this task potentially + * leading to different key groups. Upstream task has a corresponding {@link + * #rescaledChannelsMappings} where it sends data over virtual channel while specifying + * the channel index in the VirtualChannelSelector. This subtask then demultiplexes over + * the virtual subtask index. + * </ul> + * + * <ul> + * {@link #rescaledChannelsMappings} is set when there is a downscale of the upstream task. + * Upstream task has a corresponding {@link #oldSubtaskIndexes} where it sends data over + * virtual channel while specifying the subtask index in the VirtualChannelSelector. This + * subtask then demultiplexes over channel indexes. + * </ul> + * + * <p>For outputs, it's vice-versa. The information must be kept in sync but they are used in + * opposite ways for multiplexing/demultiplexing. + * + * <p>Note that in the common rescaling case both information is set and need to be + * simultaneously used. If the input subtask subsumes the state of 3 old subtasks and a channel + * corresponds to 2 old channels, then there are 6 virtual channels to be demultiplexed. + */ + public static class InflightDataGateOrPartitionRescalingDescriptor implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Set when several operator instances are merged into one. */ + private final int[] oldSubtaskIndexes; + + /** + * Set when channels are merged because the connected operator has been rescaled for each + * gate/partition. + */ + private final RescaleMappings rescaledChannelsMappings; + + /** All channels where upstream duplicates data (only valid for downstream mappings). */ + private final Set<Integer> ambiguousSubtaskIndexes; + + private final Rescaling rescaling; + + enum Rescaling { Review comment: just minor notice but name Rescaling.RESCALING looks strange. Perhaps, enum should have a different name like MappingType or whatever it is. But I don't insist on it(I don't have a better name in my mind) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org