zhuzhurk commented on code in PR #25887:
URL: https://github.com/apache/flink/pull/25887#discussion_r1901541146


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -40,7 +40,7 @@
  */
 public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartitionID> {
 
-    private final List<IntermediateResultPartitionID> resultPartitions;
+    private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionsInOrdered;

Review Comment:
   -> partitionIdToIndexInOrder
   
   Comments are also needed to explain it, like what does the index stand for.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java:
##########
@@ -314,8 +314,13 @@ private static ConsumedPartitionGroup 
createConsumedPartitionGroup(
             final int numConsumers,
             final List<IntermediateResultPartitionID> consumedPartitions,
             final ResultPartitionType resultPartitionType) {
+        final Map<IntermediateResultPartitionID, Integer> 
partitionIdToIndexInOrdered =

Review Comment:
   partitionIdToIndexInOrdered -> partitionIdToIndexInOrder



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -55,29 +55,30 @@ public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartit
 
     private ConsumedPartitionGroup(
             int numConsumers,
-            List<IntermediateResultPartitionID> resultPartitions,

Review Comment:
   From the perspective of code readability and maintenance, it's better to 
construct the map here from the list.
   It can keep the map in control and better reflects the meaning of it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int 
shuffleDescriptorIndex) {
      *
      * @param consumedSubpartitionGroups a mapping of consumed partition index 
ranges to
      *     subpartition ranges.
-     * @param consumedResultPartitions an iterator of {@link 
IntermediateResultPartitionID} for the
-     *     consumed result partitions.
-     * @param partitions all partition ids of consumed {@link 
IntermediateResult}.
+     * @param partitionIdToShuffleDescriptorIndexMap a map that associates 
each {@link
+     *     IntermediateResultPartitionID} with its corresponding shuffle 
descriptor index.
+     * @param partitionIdRetriever a function that retrieves the {@link
+     *     IntermediateResultPartitionID} for a given index.
      * @return a {@link ConsumedSubpartitionContext} instance constructed from 
the input parameters.
      */
     public static ConsumedSubpartitionContext buildConsumedSubpartitionContext(
             Map<IndexRange, IndexRange> consumedSubpartitionGroups,
-            Iterator<IntermediateResultPartitionID> consumedResultPartitions,
-            IntermediateResultPartitionID[] partitions) {
-        Map<IntermediateResultPartitionID, Integer> 
partitionIdToShuffleDescriptorIndexMap =
-                new HashMap<>();
-        while (consumedResultPartitions.hasNext()) {
-            IntermediateResultPartitionID partitionId = 
consumedResultPartitions.next();
-            partitionIdToShuffleDescriptorIndexMap.put(
-                    partitionId, 
partitionIdToShuffleDescriptorIndexMap.size());
+            Map<IntermediateResultPartitionID, Integer> 
partitionIdToShuffleDescriptorIndexMap,
+            Function<Integer, IntermediateResultPartitionID> 
partitionIdRetriever) {
+        if (consumedSubpartitionGroups.size() == 1

Review Comment:
   It's better to add some comments to explain this case.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int 
shuffleDescriptorIndex) {
      *
      * @param consumedSubpartitionGroups a mapping of consumed partition index 
ranges to
      *     subpartition ranges.
-     * @param consumedResultPartitions an iterator of {@link 
IntermediateResultPartitionID} for the
-     *     consumed result partitions.
-     * @param partitions all partition ids of consumed {@link 
IntermediateResult}.
+     * @param partitionIdToShuffleDescriptorIndexMap a map that associates 
each {@link
+     *     IntermediateResultPartitionID} with its corresponding shuffle 
descriptor index.
+     * @param partitionIdRetriever a function that retrieves the {@link
+     *     IntermediateResultPartitionID} for a given index.
      * @return a {@link ConsumedSubpartitionContext} instance constructed from 
the input parameters.
      */
     public static ConsumedSubpartitionContext buildConsumedSubpartitionContext(
             Map<IndexRange, IndexRange> consumedSubpartitionGroups,
-            Iterator<IntermediateResultPartitionID> consumedResultPartitions,
-            IntermediateResultPartitionID[] partitions) {
-        Map<IntermediateResultPartitionID, Integer> 
partitionIdToShuffleDescriptorIndexMap =
-                new HashMap<>();
-        while (consumedResultPartitions.hasNext()) {
-            IntermediateResultPartitionID partitionId = 
consumedResultPartitions.next();
-            partitionIdToShuffleDescriptorIndexMap.put(
-                    partitionId, 
partitionIdToShuffleDescriptorIndexMap.size());
+            Map<IntermediateResultPartitionID, Integer> 
partitionIdToShuffleDescriptorIndexMap,

Review Comment:
   How to about to just pass in a `ConsumedPartitionGroup`?
   It can better reflects the relationship of the context with the consumption 
group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to