zhuzhurk commented on code in PR #25887: URL: https://github.com/apache/flink/pull/25887#discussion_r1901541146
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java: ########## @@ -40,7 +40,7 @@ */ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartitionID> { - private final List<IntermediateResultPartitionID> resultPartitions; + private final Map<IntermediateResultPartitionID, Integer> resultPartitionsInOrdered; Review Comment: -> partitionIdToIndexInOrder Comments are also needed to explain it, like what does the index stand for. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java: ########## @@ -314,8 +314,13 @@ private static ConsumedPartitionGroup createConsumedPartitionGroup( final int numConsumers, final List<IntermediateResultPartitionID> consumedPartitions, final ResultPartitionType resultPartitionType) { + final Map<IntermediateResultPartitionID, Integer> partitionIdToIndexInOrdered = Review Comment: partitionIdToIndexInOrdered -> partitionIdToIndexInOrder ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java: ########## @@ -55,29 +55,30 @@ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartit private ConsumedPartitionGroup( int numConsumers, - List<IntermediateResultPartitionID> resultPartitions, Review Comment: From the perspective of code readability and maintenance, it's better to construct the map here from the list. It can keep the map in control and better reflects the meaning of it. ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int shuffleDescriptorIndex) { * * @param consumedSubpartitionGroups a mapping of consumed partition index ranges to * subpartition ranges. - * @param consumedResultPartitions an iterator of {@link IntermediateResultPartitionID} for the - * consumed result partitions. - * @param partitions all partition ids of consumed {@link IntermediateResult}. + * @param partitionIdToShuffleDescriptorIndexMap a map that associates each {@link + * IntermediateResultPartitionID} with its corresponding shuffle descriptor index. + * @param partitionIdRetriever a function that retrieves the {@link + * IntermediateResultPartitionID} for a given index. * @return a {@link ConsumedSubpartitionContext} instance constructed from the input parameters. */ public static ConsumedSubpartitionContext buildConsumedSubpartitionContext( Map<IndexRange, IndexRange> consumedSubpartitionGroups, - Iterator<IntermediateResultPartitionID> consumedResultPartitions, - IntermediateResultPartitionID[] partitions) { - Map<IntermediateResultPartitionID, Integer> partitionIdToShuffleDescriptorIndexMap = - new HashMap<>(); - while (consumedResultPartitions.hasNext()) { - IntermediateResultPartitionID partitionId = consumedResultPartitions.next(); - partitionIdToShuffleDescriptorIndexMap.put( - partitionId, partitionIdToShuffleDescriptorIndexMap.size()); + Map<IntermediateResultPartitionID, Integer> partitionIdToShuffleDescriptorIndexMap, + Function<Integer, IntermediateResultPartitionID> partitionIdRetriever) { + if (consumedSubpartitionGroups.size() == 1 Review Comment: It's better to add some comments to explain this case. ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int shuffleDescriptorIndex) { * * @param consumedSubpartitionGroups a mapping of consumed partition index ranges to * subpartition ranges. - * @param consumedResultPartitions an iterator of {@link IntermediateResultPartitionID} for the - * consumed result partitions. - * @param partitions all partition ids of consumed {@link IntermediateResult}. + * @param partitionIdToShuffleDescriptorIndexMap a map that associates each {@link + * IntermediateResultPartitionID} with its corresponding shuffle descriptor index. + * @param partitionIdRetriever a function that retrieves the {@link + * IntermediateResultPartitionID} for a given index. * @return a {@link ConsumedSubpartitionContext} instance constructed from the input parameters. */ public static ConsumedSubpartitionContext buildConsumedSubpartitionContext( Map<IndexRange, IndexRange> consumedSubpartitionGroups, - Iterator<IntermediateResultPartitionID> consumedResultPartitions, - IntermediateResultPartitionID[] partitions) { - Map<IntermediateResultPartitionID, Integer> partitionIdToShuffleDescriptorIndexMap = - new HashMap<>(); - while (consumedResultPartitions.hasNext()) { - IntermediateResultPartitionID partitionId = consumedResultPartitions.next(); - partitionIdToShuffleDescriptorIndexMap.put( - partitionId, partitionIdToShuffleDescriptorIndexMap.size()); + Map<IntermediateResultPartitionID, Integer> partitionIdToShuffleDescriptorIndexMap, Review Comment: How to about to just pass in a `ConsumedPartitionGroup`? It can better reflects the relationship of the context with the consumption group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org