zhuzhurk commented on code in PR #25552: URL: https://github.com/apache/flink/pull/25552#discussion_r1899512792
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.IntStream; + +/** Helper class that provides information for subpartition slice. */ +public class SubpartitionSlice { + + int subpartitionIndex; + IndexRange partitionRange; + + long size; + + public SubpartitionSlice(int subpartitionIndex, IndexRange partitionRange, long size) { + this.subpartitionIndex = subpartitionIndex; + this.partitionRange = partitionRange; + this.size = size; + } + + public long getSize() { + return size; + } + + public int getSubpartitionIndex() { + return subpartitionIndex; + } + + /** + * SubpartitionSlice is used to describe a group of inputs with the same type number which may + * have different numbers of partitions, so we need to use the specific partitions number to get + * the correct partition range. + * + * @param numPartitions the number of partitions + * @return the partition range if the partition range is valid, empty otherwise + */ + public Optional<IndexRange> getPartitionRange(int numPartitions) { + if (partitionRange.getEndIndex() < numPartitions) { + return Optional.of(partitionRange); + } else if (partitionRange.getStartIndex() < numPartitions + && partitionRange.getEndIndex() >= numPartitions) { + return Optional.of(new IndexRange(partitionRange.getStartIndex(), numPartitions - 1)); + } else { + return Optional.empty(); + } + } + + public static SubpartitionSlice createSubpartitionSlice( + int subpartitionIndex, IndexRange partitionRange, long aggregatedSubpartitionBytes) { + return new SubpartitionSlice( + subpartitionIndex, partitionRange, aggregatedSubpartitionBytes); + } + + public static SubpartitionSlice createSubpartitionSlice( Review Comment: can be private ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.IntStream; + +/** Helper class that provides information for subpartition slice. */ +public class SubpartitionSlice { + + int subpartitionIndex; + IndexRange partitionRange; + + long size; + + public SubpartitionSlice(int subpartitionIndex, IndexRange partitionRange, long size) { + this.subpartitionIndex = subpartitionIndex; + this.partitionRange = partitionRange; + this.size = size; + } + + public long getSize() { + return size; + } + + public int getSubpartitionIndex() { + return subpartitionIndex; + } + + /** + * SubpartitionSlice is used to describe a group of inputs with the same type number which may + * have different numbers of partitions, so we need to use the specific partitions number to get + * the correct partition range. Review Comment: It's a bit unclear to me that what's the meaning of the result range. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -126,52 +136,101 @@ public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( ? vertexInitialParallelism : computeSourceParallelismUpperBound(jobVertexId, vertexMaxParallelism); return new ParallelismAndInputInfos(parallelism, Collections.emptyMap()); - } else { - int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); - int maxParallelism = globalMaxParallelism; - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < minParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " - + "Use {} as the lower bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - minParallelism, - vertexMaxParallelism, - jobVertexId); - minParallelism = vertexMaxParallelism; - } - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < maxParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " - + "Use {} as the upper bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - maxParallelism, - vertexMaxParallelism, - jobVertexId); - maxParallelism = vertexMaxParallelism; - } - checkState(maxParallelism >= minParallelism); - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && areAllInputsAllToAll(consumedResults) - && !areAllInputsBroadcast(consumedResults)) { - return decideParallelismAndEvenlyDistributeData( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); - } else { - return decideParallelismAndEvenlyDistributeSubpartitions( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); + } + + int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); + int maxParallelism = globalMaxParallelism; + + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < minParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " + + "Use {} as the lower bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + minParallelism, + vertexMaxParallelism, + jobVertexId); + minParallelism = vertexMaxParallelism; + } + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < maxParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " + + "Use {} as the upper bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + maxParallelism, + vertexMaxParallelism, + jobVertexId); + maxParallelism = vertexMaxParallelism; + } + checkState(maxParallelism >= minParallelism); + + int parallelism = + vertexInitialParallelism > 0 + ? vertexInitialParallelism + : decideParallelism( + jobVertexId, consumedResults, minParallelism, maxParallelism); + + Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation = + consumedResults.stream() + .collect( + Collectors.groupingBy( + BlockingInputInfo::existInterInputsKeyCorrelation)); + + // For AllToAll like inputs, we derive parallelism as a whole, while for Pointwise inputs, + // we need to derive parallelism separately for each input. + // + // In the following cases, we need to reset min parallelism and max parallelism to ensure + // that the decide parallelism for all inputs is consistent : + // 1. Vertex has a specified parallelism + // 2. There are edges that don't need to follow intergroup constraint Review Comment: Why? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.IntStream; + +/** Helper class that provides information for subpartition slice. */ +public class SubpartitionSlice { + + int subpartitionIndex; + IndexRange partitionRange; + + long size; Review Comment: size -> dataBytes ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -126,52 +136,101 @@ public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( ? vertexInitialParallelism : computeSourceParallelismUpperBound(jobVertexId, vertexMaxParallelism); return new ParallelismAndInputInfos(parallelism, Collections.emptyMap()); - } else { - int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); - int maxParallelism = globalMaxParallelism; - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < minParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " - + "Use {} as the lower bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - minParallelism, - vertexMaxParallelism, - jobVertexId); - minParallelism = vertexMaxParallelism; - } - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < maxParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " - + "Use {} as the upper bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - maxParallelism, - vertexMaxParallelism, - jobVertexId); - maxParallelism = vertexMaxParallelism; - } - checkState(maxParallelism >= minParallelism); - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && areAllInputsAllToAll(consumedResults) - && !areAllInputsBroadcast(consumedResults)) { - return decideParallelismAndEvenlyDistributeData( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); - } else { - return decideParallelismAndEvenlyDistributeSubpartitions( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); + } + + int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); + int maxParallelism = globalMaxParallelism; + + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < minParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " + + "Use {} as the lower bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + minParallelism, + vertexMaxParallelism, + jobVertexId); + minParallelism = vertexMaxParallelism; + } + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < maxParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " + + "Use {} as the upper bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + maxParallelism, + vertexMaxParallelism, + jobVertexId); + maxParallelism = vertexMaxParallelism; + } + checkState(maxParallelism >= minParallelism); + + int parallelism = + vertexInitialParallelism > 0 + ? vertexInitialParallelism + : decideParallelism( + jobVertexId, consumedResults, minParallelism, maxParallelism); + + Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation = + consumedResults.stream() + .collect( + Collectors.groupingBy( + BlockingInputInfo::existInterInputsKeyCorrelation)); + + // For AllToAll like inputs, we derive parallelism as a whole, while for Pointwise inputs, + // we need to derive parallelism separately for each input. + // + // In the following cases, we need to reset min parallelism and max parallelism to ensure + // that the decide parallelism for all inputs is consistent : + // 1. Vertex has a specified parallelism + // 2. There are edges that don't need to follow intergroup constraint + if (vertexInitialParallelism > 0 || inputsGroupByInterCorrelation.containsKey(false)) { + minParallelism = parallelism; + maxParallelism = parallelism; + } + + Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = new HashMap<>(); + + if (inputsGroupByInterCorrelation.containsKey(true)) { + vertexInputInfoMap.putAll( + allToAllVertexInputInfoComputer.compute( + jobVertexId, + inputsGroupByInterCorrelation.get(true), + parallelism, + minParallelism, + maxParallelism)); + } + + if (inputsGroupByInterCorrelation.containsKey(false)) { + List<BlockingInputInfo> inputsWithoutInterCorrelation = + inputsGroupByInterCorrelation.get(false); + for (BlockingInputInfo input : inputsWithoutInterCorrelation) { + if (input.existIntraInputKeyCorrelation()) { + vertexInputInfoMap.putAll( + allToAllVertexInputInfoComputer.compute( + jobVertexId, + Collections.singletonList(input), + parallelism, + minParallelism, + maxParallelism)); + } else { + vertexInputInfoMap.put( + input.getResultId(), + pointwiseVertexInputInfoComputer.compute(input, parallelism)); + } } } + int finalParallelism = checkAndGetParallelism(vertexInputInfoMap.values()); + + Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMapInOrder = + new LinkedHashMap<>(); + + for (BlockingInputInfo inputInfo : consumedResults) { + vertexInputInfoMapInOrder.put( + inputInfo.getResultId(), vertexInputInfoMap.get(inputInfo.getResultId())); + } + + return new ParallelismAndInputInfos(finalParallelism, vertexInputInfoMapInOrder); Review Comment: Not sure why it's needed to make it in order? Currently in `ParallelismAndInputInfos`, it is not required that the `jobVertexInputInfos` is in order. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.JobVertexInputInfo; +import org.apache.flink.runtime.scheduler.adaptivebatch.BisectionSearchUtils; +import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** Utils class for VertexParallelismAndInputInfosDecider. */ +public class VertexParallelismAndInputInfosDeciderUtils { + /** + * Adjust the parallelism to the closest legal parallelism and return the computed subpartition + * ranges. + * + * @param currentDataVolumeLimit current data volume limit + * @param currentParallelism current parallelism + * @param minParallelism the min parallelism + * @param maxParallelism the max parallelism + * @param minLimit the minimum data volume limit + * @param maxLimit the maximum data volume limit + * @param parallelismComputer a function to compute the parallelism according to the data volume + * limit + * @param subpartitionRangesComputer a function to compute the subpartition ranges according to + * the data volume limit + * @return the computed subpartition ranges or {@link Optional#empty()} if we can't find any + * legal parallelism + */ + public static Optional<List<IndexRange>> adjustToClosestLegalParallelism( + long currentDataVolumeLimit, + int currentParallelism, + int minParallelism, + int maxParallelism, + long minLimit, + long maxLimit, + Function<Long, Integer> parallelismComputer, + Function<Long, List<IndexRange>> subpartitionRangesComputer) { + long adjustedDataVolumeLimit = currentDataVolumeLimit; + if (currentParallelism < minParallelism) { + // Current parallelism is smaller than the user-specified lower-limit of parallelism , + // we need to adjust it to the closest/minimum possible legal parallelism. That is, we + // need to find the maximum legal dataVolumeLimit. + adjustedDataVolumeLimit = + BisectionSearchUtils.findMaxLegalValue( + value -> parallelismComputer.apply(value) >= minParallelism, + minLimit, + currentDataVolumeLimit); + + // When we find the minimum possible legal parallelism, the dataVolumeLimit that can + // lead to this parallelism may be a range, and we need to find the minimum value of + // this range to make the data distribution as even as possible (the smaller the + // dataVolumeLimit, the more even the distribution) + final long minPossibleLegalParallelism = + parallelismComputer.apply(adjustedDataVolumeLimit); + adjustedDataVolumeLimit = + BisectionSearchUtils.findMinLegalValue( + value -> + parallelismComputer.apply(value) == minPossibleLegalParallelism, + minLimit, + adjustedDataVolumeLimit); + + } else if (currentParallelism > maxParallelism) { + // Current parallelism is larger than the user-specified upper-limit of parallelism , + // we need to adjust it to the closest/maximum possible legal parallelism. That is, we + // need to find the minimum legal dataVolumeLimit. + adjustedDataVolumeLimit = + BisectionSearchUtils.findMinLegalValue( + value -> parallelismComputer.apply(value) <= maxParallelism, + currentDataVolumeLimit, + maxLimit); + } + + int adjustedParallelism = parallelismComputer.apply(adjustedDataVolumeLimit); + if (isLegalParallelism(adjustedParallelism, minParallelism, maxParallelism)) { + return Optional.of(subpartitionRangesComputer.apply(adjustedDataVolumeLimit)); + } else { + return Optional.empty(); + } + } + + /** + * Computes the Cartesian product of a list of lists. + * + * <p>The Cartesian product is a set of all possible combinations formed by picking one element + * from each list. For example, given input lists [[1, 2], [3, 4]], the result will be [[1, 3], + * [1, 4], [2, 3], [2, 4]]. + * + * <p>Note: If the input list is empty or contains an empty list, the result will be an empty + * list. + * + * @param <T> the type of elements in the lists + * @param lists a list of lists for which the Cartesian product is to be computed + * @return a list of lists representing the Cartesian product, where each inner list is a + * combination + */ + public static <T> List<List<T>> cartesianProduct(List<List<T>> lists) { + List<List<T>> resultLists = new ArrayList<>(); + if (lists.isEmpty()) { + resultLists.add(new ArrayList<>()); + return resultLists; + } else { + List<T> firstList = lists.get(0); + List<List<T>> remainingLists = cartesianProduct(lists.subList(1, lists.size())); + for (T condition : firstList) { + for (List<T> remainingList : remainingLists) { + ArrayList<T> resultList = new ArrayList<>(); + resultList.add(condition); + resultList.addAll(remainingList); + resultLists.add(resultList); + } + } + } + return resultLists; + } + + /** + * Calculates the median of a given array of long integers. If the calculated median is less + * than 1, it returns 1 instead. + * + * @param nums an array of long integers for which to calculate the median. + * @return the median value, which will be at least 1. + */ + public static long median(long[] nums) { + int len = nums.length; + long[] sortedNums = LongStream.of(nums).sorted().toArray(); + if (len % 2 == 0) { + return Math.max((sortedNums[len / 2] + sortedNums[len / 2 - 1]) / 2, 1L); + } else { + return Math.max(sortedNums[len / 2], 1L); + } + } + + /** + * Computes the skew threshold based on the given media size and skewed factor. + * + * <p>The skew threshold is calculated as the product of the media size and the skewed factor. + * To ensure that the computed threshold does not fall below a specified default value, the + * method uses {@link Math#max} to return the larger of the calculated threshold and the default + * threshold. + * + * @param mediaSize the size of the media + * @param skewedFactor a factor indicating the degree of skewness + * @param defaultSkewedThreshold the default threshold to be used if the calculated threshold is + * less than this value + * @return the computed skew threshold, which is guaranteed to be at least the default skewed + * threshold. + */ + public static long computeSkewThreshold( + long mediaSize, double skewedFactor, long defaultSkewedThreshold) { Review Comment: media -> median ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -126,52 +136,101 @@ public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( ? vertexInitialParallelism : computeSourceParallelismUpperBound(jobVertexId, vertexMaxParallelism); return new ParallelismAndInputInfos(parallelism, Collections.emptyMap()); - } else { - int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); - int maxParallelism = globalMaxParallelism; - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < minParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " - + "Use {} as the lower bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - minParallelism, - vertexMaxParallelism, - jobVertexId); - minParallelism = vertexMaxParallelism; - } - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && vertexMaxParallelism < maxParallelism) { - LOG.info( - "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " - + "Use {} as the upper bound to decide parallelism of job vertex {}.", - vertexMaxParallelism, - maxParallelism, - vertexMaxParallelism, - jobVertexId); - maxParallelism = vertexMaxParallelism; - } - checkState(maxParallelism >= minParallelism); - - if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT - && areAllInputsAllToAll(consumedResults) - && !areAllInputsBroadcast(consumedResults)) { - return decideParallelismAndEvenlyDistributeData( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); - } else { - return decideParallelismAndEvenlyDistributeSubpartitions( - jobVertexId, - consumedResults, - vertexInitialParallelism, - minParallelism, - maxParallelism); + } + + int minParallelism = Math.max(globalMinParallelism, vertexMinParallelism); + int maxParallelism = globalMaxParallelism; + + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < minParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the minimum parallelism {}. " + + "Use {} as the lower bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + minParallelism, + vertexMaxParallelism, + jobVertexId); + minParallelism = vertexMaxParallelism; + } + if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT + && vertexMaxParallelism < maxParallelism) { + LOG.info( + "The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. " + + "Use {} as the upper bound to decide parallelism of job vertex {}.", + vertexMaxParallelism, + maxParallelism, + vertexMaxParallelism, + jobVertexId); + maxParallelism = vertexMaxParallelism; + } + checkState(maxParallelism >= minParallelism); + + int parallelism = + vertexInitialParallelism > 0 + ? vertexInitialParallelism + : decideParallelism( + jobVertexId, consumedResults, minParallelism, maxParallelism); + + Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation = + consumedResults.stream() + .collect( + Collectors.groupingBy( + BlockingInputInfo::existInterInputsKeyCorrelation)); + + // For AllToAll like inputs, we derive parallelism as a whole, while for Pointwise inputs, + // we need to derive parallelism separately for each input. + // + // In the following cases, we need to reset min parallelism and max parallelism to ensure + // that the decide parallelism for all inputs is consistent : + // 1. Vertex has a specified parallelism + // 2. There are edges that don't need to follow intergroup constraint + if (vertexInitialParallelism > 0 || inputsGroupByInterCorrelation.containsKey(false)) { + minParallelism = parallelism; + maxParallelism = parallelism; + } + + Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = new HashMap<>(); + + if (inputsGroupByInterCorrelation.containsKey(true)) { + vertexInputInfoMap.putAll( + allToAllVertexInputInfoComputer.compute( + jobVertexId, + inputsGroupByInterCorrelation.get(true), + parallelism, + minParallelism, + maxParallelism)); + } + + if (inputsGroupByInterCorrelation.containsKey(false)) { + List<BlockingInputInfo> inputsWithoutInterCorrelation = + inputsGroupByInterCorrelation.get(false); + for (BlockingInputInfo input : inputsWithoutInterCorrelation) { + if (input.existIntraInputKeyCorrelation()) { + vertexInputInfoMap.putAll( + allToAllVertexInputInfoComputer.compute( + jobVertexId, + Collections.singletonList(input), + parallelism, + minParallelism, + maxParallelism)); + } else { + vertexInputInfoMap.put( + input.getResultId(), + pointwiseVertexInputInfoComputer.compute(input, parallelism)); Review Comment: Why not existing `existIntraInputKeyCorrelation` leads to `pointwiseVertexInputInfoComputer#compute()`? `existIntraInputKeyCorrelation` is false for `rebalance` cases (all-to-all)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org