zhuzhurk commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1899512792


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.util;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/** Helper class that provides information for subpartition slice. */
+public class SubpartitionSlice {
+
+    int subpartitionIndex;
+    IndexRange partitionRange;
+
+    long size;
+
+    public SubpartitionSlice(int subpartitionIndex, IndexRange partitionRange, 
long size) {
+        this.subpartitionIndex = subpartitionIndex;
+        this.partitionRange = partitionRange;
+        this.size = size;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public int getSubpartitionIndex() {
+        return subpartitionIndex;
+    }
+
+    /**
+     * SubpartitionSlice is used to describe a group of inputs with the same 
type number which may
+     * have different numbers of partitions, so we need to use the specific 
partitions number to get
+     * the correct partition range.
+     *
+     * @param numPartitions the number of partitions
+     * @return the partition range if the partition range is valid, empty 
otherwise
+     */
+    public Optional<IndexRange> getPartitionRange(int numPartitions) {
+        if (partitionRange.getEndIndex() < numPartitions) {
+            return Optional.of(partitionRange);
+        } else if (partitionRange.getStartIndex() < numPartitions
+                && partitionRange.getEndIndex() >= numPartitions) {
+            return Optional.of(new IndexRange(partitionRange.getStartIndex(), 
numPartitions - 1));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static SubpartitionSlice createSubpartitionSlice(
+            int subpartitionIndex, IndexRange partitionRange, long 
aggregatedSubpartitionBytes) {
+        return new SubpartitionSlice(
+                subpartitionIndex, partitionRange, 
aggregatedSubpartitionBytes);
+    }
+
+    public static SubpartitionSlice createSubpartitionSlice(

Review Comment:
   can be private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.util;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/** Helper class that provides information for subpartition slice. */
+public class SubpartitionSlice {
+
+    int subpartitionIndex;
+    IndexRange partitionRange;
+
+    long size;
+
+    public SubpartitionSlice(int subpartitionIndex, IndexRange partitionRange, 
long size) {
+        this.subpartitionIndex = subpartitionIndex;
+        this.partitionRange = partitionRange;
+        this.size = size;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public int getSubpartitionIndex() {
+        return subpartitionIndex;
+    }
+
+    /**
+     * SubpartitionSlice is used to describe a group of inputs with the same 
type number which may
+     * have different numbers of partitions, so we need to use the specific 
partitions number to get
+     * the correct partition range.

Review Comment:
   It's a bit unclear to me that what's the meaning of the result range.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -126,52 +136,101 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
                             ? vertexInitialParallelism
                             : computeSourceParallelismUpperBound(jobVertexId, 
vertexMaxParallelism);
             return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
-        } else {
-            int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
-            int maxParallelism = globalMaxParallelism;
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < minParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
-                                + "Use {} as the lower bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        minParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                minParallelism = vertexMaxParallelism;
-            }
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < maxParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
-                                + "Use {} as the upper bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        maxParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                maxParallelism = vertexMaxParallelism;
-            }
-            checkState(maxParallelism >= minParallelism);
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && areAllInputsAllToAll(consumedResults)
-                    && !areAllInputsBroadcast(consumedResults)) {
-                return decideParallelismAndEvenlyDistributeData(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
-            } else {
-                return decideParallelismAndEvenlyDistributeSubpartitions(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
+        }
+
+        int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
+        int maxParallelism = globalMaxParallelism;
+
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < minParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
+                            + "Use {} as the lower bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    minParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            minParallelism = vertexMaxParallelism;
+        }
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < maxParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
+                            + "Use {} as the upper bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    maxParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            maxParallelism = vertexMaxParallelism;
+        }
+        checkState(maxParallelism >= minParallelism);
+
+        int parallelism =
+                vertexInitialParallelism > 0
+                        ? vertexInitialParallelism
+                        : decideParallelism(
+                                jobVertexId, consumedResults, minParallelism, 
maxParallelism);
+
+        Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation =
+                consumedResults.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
BlockingInputInfo::existInterInputsKeyCorrelation));
+
+        // For AllToAll like inputs, we derive parallelism as a whole, while 
for Pointwise inputs,
+        // we need to derive parallelism separately for each input.
+        //
+        // In the following cases, we need to reset min parallelism and max 
parallelism to ensure
+        // that the decide parallelism for all inputs is consistent :
+        // 1.  Vertex has a specified parallelism
+        // 2.  There are edges that don't need to follow intergroup constraint

Review Comment:
   Why?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/SubpartitionSlice.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.util;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/** Helper class that provides information for subpartition slice. */
+public class SubpartitionSlice {
+
+    int subpartitionIndex;
+    IndexRange partitionRange;
+
+    long size;

Review Comment:
   size -> dataBytes



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -126,52 +136,101 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
                             ? vertexInitialParallelism
                             : computeSourceParallelismUpperBound(jobVertexId, 
vertexMaxParallelism);
             return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
-        } else {
-            int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
-            int maxParallelism = globalMaxParallelism;
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < minParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
-                                + "Use {} as the lower bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        minParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                minParallelism = vertexMaxParallelism;
-            }
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < maxParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
-                                + "Use {} as the upper bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        maxParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                maxParallelism = vertexMaxParallelism;
-            }
-            checkState(maxParallelism >= minParallelism);
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && areAllInputsAllToAll(consumedResults)
-                    && !areAllInputsBroadcast(consumedResults)) {
-                return decideParallelismAndEvenlyDistributeData(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
-            } else {
-                return decideParallelismAndEvenlyDistributeSubpartitions(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
+        }
+
+        int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
+        int maxParallelism = globalMaxParallelism;
+
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < minParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
+                            + "Use {} as the lower bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    minParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            minParallelism = vertexMaxParallelism;
+        }
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < maxParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
+                            + "Use {} as the upper bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    maxParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            maxParallelism = vertexMaxParallelism;
+        }
+        checkState(maxParallelism >= minParallelism);
+
+        int parallelism =
+                vertexInitialParallelism > 0
+                        ? vertexInitialParallelism
+                        : decideParallelism(
+                                jobVertexId, consumedResults, minParallelism, 
maxParallelism);
+
+        Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation =
+                consumedResults.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
BlockingInputInfo::existInterInputsKeyCorrelation));
+
+        // For AllToAll like inputs, we derive parallelism as a whole, while 
for Pointwise inputs,
+        // we need to derive parallelism separately for each input.
+        //
+        // In the following cases, we need to reset min parallelism and max 
parallelism to ensure
+        // that the decide parallelism for all inputs is consistent :
+        // 1.  Vertex has a specified parallelism
+        // 2.  There are edges that don't need to follow intergroup constraint
+        if (vertexInitialParallelism > 0 || 
inputsGroupByInterCorrelation.containsKey(false)) {
+            minParallelism = parallelism;
+            maxParallelism = parallelism;
+        }
+
+        Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = 
new HashMap<>();
+
+        if (inputsGroupByInterCorrelation.containsKey(true)) {
+            vertexInputInfoMap.putAll(
+                    allToAllVertexInputInfoComputer.compute(
+                            jobVertexId,
+                            inputsGroupByInterCorrelation.get(true),
+                            parallelism,
+                            minParallelism,
+                            maxParallelism));
+        }
+
+        if (inputsGroupByInterCorrelation.containsKey(false)) {
+            List<BlockingInputInfo> inputsWithoutInterCorrelation =
+                    inputsGroupByInterCorrelation.get(false);
+            for (BlockingInputInfo input : inputsWithoutInterCorrelation) {
+                if (input.existIntraInputKeyCorrelation()) {
+                    vertexInputInfoMap.putAll(
+                            allToAllVertexInputInfoComputer.compute(
+                                    jobVertexId,
+                                    Collections.singletonList(input),
+                                    parallelism,
+                                    minParallelism,
+                                    maxParallelism));
+                } else {
+                    vertexInputInfoMap.put(
+                            input.getResultId(),
+                            pointwiseVertexInputInfoComputer.compute(input, 
parallelism));
+                }
             }
         }
+        int finalParallelism = 
checkAndGetParallelism(vertexInputInfoMap.values());
+
+        Map<IntermediateDataSetID, JobVertexInputInfo> 
vertexInputInfoMapInOrder =
+                new LinkedHashMap<>();
+
+        for (BlockingInputInfo inputInfo : consumedResults) {
+            vertexInputInfoMapInOrder.put(
+                    inputInfo.getResultId(), 
vertexInputInfoMap.get(inputInfo.getResultId()));
+        }
+
+        return new ParallelismAndInputInfos(finalParallelism, 
vertexInputInfoMapInOrder);

Review Comment:
   Not sure why it's needed to make it in order? Currently in 
`ParallelismAndInputInfos`, it is not required that the `jobVertexInputInfos` 
is in order.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.util;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BisectionSearchUtils;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils class for VertexParallelismAndInputInfosDecider. */
+public class VertexParallelismAndInputInfosDeciderUtils {
+    /**
+     * Adjust the parallelism to the closest legal parallelism and return the 
computed subpartition
+     * ranges.
+     *
+     * @param currentDataVolumeLimit current data volume limit
+     * @param currentParallelism current parallelism
+     * @param minParallelism the min parallelism
+     * @param maxParallelism the max parallelism
+     * @param minLimit the minimum data volume limit
+     * @param maxLimit the maximum data volume limit
+     * @param parallelismComputer a function to compute the parallelism 
according to the data volume
+     *     limit
+     * @param subpartitionRangesComputer a function to compute the 
subpartition ranges according to
+     *     the data volume limit
+     * @return the computed subpartition ranges or {@link Optional#empty()} if 
we can't find any
+     *     legal parallelism
+     */
+    public static Optional<List<IndexRange>> adjustToClosestLegalParallelism(
+            long currentDataVolumeLimit,
+            int currentParallelism,
+            int minParallelism,
+            int maxParallelism,
+            long minLimit,
+            long maxLimit,
+            Function<Long, Integer> parallelismComputer,
+            Function<Long, List<IndexRange>> subpartitionRangesComputer) {
+        long adjustedDataVolumeLimit = currentDataVolumeLimit;
+        if (currentParallelism < minParallelism) {
+            // Current parallelism is smaller than the user-specified 
lower-limit of parallelism ,
+            // we need to adjust it to the closest/minimum possible legal 
parallelism. That is, we
+            // need to find the maximum legal dataVolumeLimit.
+            adjustedDataVolumeLimit =
+                    BisectionSearchUtils.findMaxLegalValue(
+                            value -> parallelismComputer.apply(value) >= 
minParallelism,
+                            minLimit,
+                            currentDataVolumeLimit);
+
+            // When we find the minimum possible legal parallelism, the 
dataVolumeLimit that can
+            // lead to this parallelism may be a range, and we need to find 
the minimum value of
+            // this range to make the data distribution as even as possible 
(the smaller the
+            // dataVolumeLimit, the more even the distribution)
+            final long minPossibleLegalParallelism =
+                    parallelismComputer.apply(adjustedDataVolumeLimit);
+            adjustedDataVolumeLimit =
+                    BisectionSearchUtils.findMinLegalValue(
+                            value ->
+                                    parallelismComputer.apply(value) == 
minPossibleLegalParallelism,
+                            minLimit,
+                            adjustedDataVolumeLimit);
+
+        } else if (currentParallelism > maxParallelism) {
+            // Current parallelism is larger than the user-specified 
upper-limit of parallelism ,
+            // we need to adjust it to the closest/maximum possible legal 
parallelism. That is, we
+            // need to find the minimum legal dataVolumeLimit.
+            adjustedDataVolumeLimit =
+                    BisectionSearchUtils.findMinLegalValue(
+                            value -> parallelismComputer.apply(value) <= 
maxParallelism,
+                            currentDataVolumeLimit,
+                            maxLimit);
+        }
+
+        int adjustedParallelism = 
parallelismComputer.apply(adjustedDataVolumeLimit);
+        if (isLegalParallelism(adjustedParallelism, minParallelism, 
maxParallelism)) {
+            return 
Optional.of(subpartitionRangesComputer.apply(adjustedDataVolumeLimit));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Computes the Cartesian product of a list of lists.
+     *
+     * <p>The Cartesian product is a set of all possible combinations formed 
by picking one element
+     * from each list. For example, given input lists [[1, 2], [3, 4]], the 
result will be [[1, 3],
+     * [1, 4], [2, 3], [2, 4]].
+     *
+     * <p>Note: If the input list is empty or contains an empty list, the 
result will be an empty
+     * list.
+     *
+     * @param <T> the type of elements in the lists
+     * @param lists a list of lists for which the Cartesian product is to be 
computed
+     * @return a list of lists representing the Cartesian product, where each 
inner list is a
+     *     combination
+     */
+    public static <T> List<List<T>> cartesianProduct(List<List<T>> lists) {
+        List<List<T>> resultLists = new ArrayList<>();
+        if (lists.isEmpty()) {
+            resultLists.add(new ArrayList<>());
+            return resultLists;
+        } else {
+            List<T> firstList = lists.get(0);
+            List<List<T>> remainingLists = cartesianProduct(lists.subList(1, 
lists.size()));
+            for (T condition : firstList) {
+                for (List<T> remainingList : remainingLists) {
+                    ArrayList<T> resultList = new ArrayList<>();
+                    resultList.add(condition);
+                    resultList.addAll(remainingList);
+                    resultLists.add(resultList);
+                }
+            }
+        }
+        return resultLists;
+    }
+
+    /**
+     * Calculates the median of a given array of long integers. If the 
calculated median is less
+     * than 1, it returns 1 instead.
+     *
+     * @param nums an array of long integers for which to calculate the median.
+     * @return the median value, which will be at least 1.
+     */
+    public static long median(long[] nums) {
+        int len = nums.length;
+        long[] sortedNums = LongStream.of(nums).sorted().toArray();
+        if (len % 2 == 0) {
+            return Math.max((sortedNums[len / 2] + sortedNums[len / 2 - 1]) / 
2, 1L);
+        } else {
+            return Math.max(sortedNums[len / 2], 1L);
+        }
+    }
+
+    /**
+     * Computes the skew threshold based on the given media size and skewed 
factor.
+     *
+     * <p>The skew threshold is calculated as the product of the media size 
and the skewed factor.
+     * To ensure that the computed threshold does not fall below a specified 
default value, the
+     * method uses {@link Math#max} to return the larger of the calculated 
threshold and the default
+     * threshold.
+     *
+     * @param mediaSize the size of the media
+     * @param skewedFactor a factor indicating the degree of skewness
+     * @param defaultSkewedThreshold the default threshold to be used if the 
calculated threshold is
+     *     less than this value
+     * @return the computed skew threshold, which is guaranteed to be at least 
the default skewed
+     *     threshold.
+     */
+    public static long computeSkewThreshold(
+            long mediaSize, double skewedFactor, long defaultSkewedThreshold) {

Review Comment:
   media -> median



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -126,52 +136,101 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
                             ? vertexInitialParallelism
                             : computeSourceParallelismUpperBound(jobVertexId, 
vertexMaxParallelism);
             return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
-        } else {
-            int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
-            int maxParallelism = globalMaxParallelism;
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < minParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
-                                + "Use {} as the lower bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        minParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                minParallelism = vertexMaxParallelism;
-            }
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && vertexMaxParallelism < maxParallelism) {
-                LOG.info(
-                        "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
-                                + "Use {} as the upper bound to decide 
parallelism of job vertex {}.",
-                        vertexMaxParallelism,
-                        maxParallelism,
-                        vertexMaxParallelism,
-                        jobVertexId);
-                maxParallelism = vertexMaxParallelism;
-            }
-            checkState(maxParallelism >= minParallelism);
-
-            if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-                    && areAllInputsAllToAll(consumedResults)
-                    && !areAllInputsBroadcast(consumedResults)) {
-                return decideParallelismAndEvenlyDistributeData(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
-            } else {
-                return decideParallelismAndEvenlyDistributeSubpartitions(
-                        jobVertexId,
-                        consumedResults,
-                        vertexInitialParallelism,
-                        minParallelism,
-                        maxParallelism);
+        }
+
+        int minParallelism = Math.max(globalMinParallelism, 
vertexMinParallelism);
+        int maxParallelism = globalMaxParallelism;
+
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < minParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
minimum parallelism {}. "
+                            + "Use {} as the lower bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    minParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            minParallelism = vertexMaxParallelism;
+        }
+        if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                && vertexMaxParallelism < maxParallelism) {
+            LOG.info(
+                    "The vertex maximum parallelism {} is smaller than the 
global maximum parallelism {}. "
+                            + "Use {} as the upper bound to decide parallelism 
of job vertex {}.",
+                    vertexMaxParallelism,
+                    maxParallelism,
+                    vertexMaxParallelism,
+                    jobVertexId);
+            maxParallelism = vertexMaxParallelism;
+        }
+        checkState(maxParallelism >= minParallelism);
+
+        int parallelism =
+                vertexInitialParallelism > 0
+                        ? vertexInitialParallelism
+                        : decideParallelism(
+                                jobVertexId, consumedResults, minParallelism, 
maxParallelism);
+
+        Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation =
+                consumedResults.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
BlockingInputInfo::existInterInputsKeyCorrelation));
+
+        // For AllToAll like inputs, we derive parallelism as a whole, while 
for Pointwise inputs,
+        // we need to derive parallelism separately for each input.
+        //
+        // In the following cases, we need to reset min parallelism and max 
parallelism to ensure
+        // that the decide parallelism for all inputs is consistent :
+        // 1.  Vertex has a specified parallelism
+        // 2.  There are edges that don't need to follow intergroup constraint
+        if (vertexInitialParallelism > 0 || 
inputsGroupByInterCorrelation.containsKey(false)) {
+            minParallelism = parallelism;
+            maxParallelism = parallelism;
+        }
+
+        Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = 
new HashMap<>();
+
+        if (inputsGroupByInterCorrelation.containsKey(true)) {
+            vertexInputInfoMap.putAll(
+                    allToAllVertexInputInfoComputer.compute(
+                            jobVertexId,
+                            inputsGroupByInterCorrelation.get(true),
+                            parallelism,
+                            minParallelism,
+                            maxParallelism));
+        }
+
+        if (inputsGroupByInterCorrelation.containsKey(false)) {
+            List<BlockingInputInfo> inputsWithoutInterCorrelation =
+                    inputsGroupByInterCorrelation.get(false);
+            for (BlockingInputInfo input : inputsWithoutInterCorrelation) {
+                if (input.existIntraInputKeyCorrelation()) {
+                    vertexInputInfoMap.putAll(
+                            allToAllVertexInputInfoComputer.compute(
+                                    jobVertexId,
+                                    Collections.singletonList(input),
+                                    parallelism,
+                                    minParallelism,
+                                    maxParallelism));
+                } else {
+                    vertexInputInfoMap.put(
+                            input.getResultId(),
+                            pointwiseVertexInputInfoComputer.compute(input, 
parallelism));

Review Comment:
   Why not existing `existIntraInputKeyCorrelation` leads to 
`pointwiseVertexInputInfoComputer#compute()`?
   `existIntraInputKeyCorrelation` is false for `rebalance` cases (all-to-all)?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to