akalash commented on a change in pull request #16019:
URL: https://github.com/apache/flink/pull/16019#discussion_r642929402



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapper.java
##########
@@ -54,18 +54,6 @@
         }
     },
 
-    /**
-     * Discards extra state. Useful if all subtasks already contain the same 
information
-     * (broadcast).
-     */
-    DISCARD_EXTRA_STATE {

Review comment:
       I personally didn't find any problem with this removing but I just want 
to emphasize that if this enum is using somewhere in WebUI(or similar) it can 
not be so easily removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
##########
@@ -101,50 +66,150 @@ public boolean equals(Object o) {
             return false;
         }
         InflightDataRescalingDescriptor that = 
(InflightDataRescalingDescriptor) o;
-        return Arrays.equals(oldSubtaskIndexes, that.oldSubtaskIndexes)
-                && Arrays.equals(rescaledChannelsMappings, 
that.rescaledChannelsMappings)
-                && Objects.equals(ambiguousSubtaskIndexes, 
that.ambiguousSubtaskIndexes);
+        return Arrays.equals(gateOrPartitionDescriptors, 
that.gateOrPartitionDescriptors);
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(ambiguousSubtaskIndexes);
-        result = 31 * result + Arrays.hashCode(oldSubtaskIndexes);
-        result = 31 * result + Arrays.hashCode(rescaledChannelsMappings);
-        return result;
+        return Arrays.hashCode(gateOrPartitionDescriptors);
     }
 
     @Override
     public String toString() {
         return "InflightDataRescalingDescriptor{"
-                + "oldSubtaskIndexes="
-                + Arrays.toString(oldSubtaskIndexes)
-                + ", rescaledChannelsMappings="
-                + Arrays.toString(rescaledChannelsMappings)
-                + ", ambiguousSubtaskIndexes="
-                + ambiguousSubtaskIndexes
+                + "gateOrPartitionDescriptors="
+                + Arrays.toString(gateOrPartitionDescriptors)
                 + '}';
     }
 
+    /**
+     * Captures ambiguous mappings of old channels to new channels.
+     *
+     * <p>For inputs, this mapping implies the following:
+     * <li>
+     *
+     *     <ul>
+     *       {@link #oldSubtaskIndexes} is set when there is a rescale on this 
task potentially
+     *       leading to different key groups. Upstream task has a 
corresponding {@link
+     *       #rescaledChannelsMappings} where it sends data over virtual 
channel while specifying
+     *       the channel index in the VirtualChannelSelector. This subtask 
then demultiplexes over
+     *       the virtual subtask index.
+     * </ul>
+     *
+     * <ul>
+     *   {@link #rescaledChannelsMappings} is set when there is a downscale of 
the upstream task.
+     *   Upstream task has a corresponding {@link #oldSubtaskIndexes} where it 
sends data over
+     *   virtual channel while specifying the subtask index in the 
VirtualChannelSelector. This
+     *   subtask then demultiplexes over channel indexes.
+     * </ul>
+     *
+     * <p>For outputs, it's vice-versa. The information must be kept in sync 
but they are used in
+     * opposite ways for multiplexing/demultiplexing.
+     *
+     * <p>Note that in the common rescaling case both information is set and 
need to be
+     * simultaneously used. If the input subtask subsumes the state of 3 old 
subtasks and a channel
+     * corresponds to 2 old channels, then there are 6 virtual channels to be 
demultiplexed.
+     */
+    public static class InflightDataGateOrPartitionRescalingDescriptor 
implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        /** Set when several operator instances are merged into one. */
+        private final int[] oldSubtaskIndexes;
+
+        /**
+         * Set when channels are merged because the connected operator has 
been rescaled for each
+         * gate/partition.
+         */
+        private final RescaleMappings rescaledChannelsMappings;
+
+        /** All channels where upstream duplicates data (only valid for 
downstream mappings). */
+        private final Set<Integer> ambiguousSubtaskIndexes;
+
+        private final Rescaling rescaling;
+
+        enum Rescaling {

Review comment:
       just minor notice but name Rescaling.RESCALING looks strange. Perhaps, 
enum should have a different name like MappingType or whatever it is. But I 
don't insist on it(I don't have a better name in my mind)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to