zhuzhurk commented on code in PR #25552: URL: https://github.com/apache/flink/pull/25552#discussion_r1898346551
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +public class BlockingInputInfo implements BlockingResultInfo { Review Comment: Java docs are required. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/AggregatedBlockingInputInfo.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.checkAndGetIntraCorrelation; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.computeSkewThreshold; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.computeTargetSize; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.getMaxNumPartitions; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.hasSameNumPartitions; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.median; + +/** Helper class that provides information of aggregated input infos. */ +public class AggregatedBlockingInputInfo { + private final int maxPartitionNum; + + private final long skewedThreshold; + private final long targetSize; + + private final boolean hasSamePartitionNums; + private final boolean existIntraInputCorrelation; + + private final Map<Integer, long[]> subpartitionBytesByPartition; + private final long[] aggregatedSubpartitionBytes; + + public AggregatedBlockingInputInfo( + long targetSize, + long skewedThreshold, + int maxPartitionNum, + boolean hasSamePartitionNums, + boolean existIntraInputCorrelation, + Map<Integer, long[]> subpartitionBytesByPartition, + long[] aggregatedSubpartitionBytes) { + this.maxPartitionNum = maxPartitionNum; + this.skewedThreshold = skewedThreshold; + this.targetSize = targetSize; + this.hasSamePartitionNums = hasSamePartitionNums; + this.existIntraInputCorrelation = existIntraInputCorrelation; + this.subpartitionBytesByPartition = subpartitionBytesByPartition; Review Comment: In public methods, especially constructors, it's better to add `checkNotNull` for arguments so that later one can ensure that the corresponding field (usually final/immutable) is non-null. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +public class BlockingInputInfo implements BlockingResultInfo { + private final BlockingResultInfo blockingResultInfo; + private final int inputTypeNumber; + private final boolean existInterInputsKeyCorrelation; + private final boolean existIntraInputKeyCorrelation; + + public BlockingInputInfo( + BlockingResultInfo blockingResultInfo, + int inputTypeNumber, + boolean existInterInputsKeyCorrelation, + boolean existIntraInputKeyCorrelation) { + this.blockingResultInfo = blockingResultInfo; Review Comment: checkNotNull ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -835,21 +836,30 @@ private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parall } /** Get information of consumable results. */ - private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo( + private Optional<List<BlockingInputInfo>> tryGetConsumedResultsInfoView( final ExecutionJobVertex jobVertex) { - List<BlockingResultInfo> consumableResultInfo = new ArrayList<>(); + List<BlockingInputInfo> consumableResultInfo = new ArrayList<>(); DefaultLogicalVertex logicalVertex = logicalTopology.getVertex(jobVertex.getJobVertexId()); - Iterable<DefaultLogicalResult> consumedResults = logicalVertex.getConsumedResults(); + Iterator<DefaultLogicalResult> consumedResults = + logicalVertex.getConsumedResults().iterator(); + Iterator<JobEdge> jobEdges = jobVertex.getJobVertex().getInputs().iterator(); - for (DefaultLogicalResult consumedResult : consumedResults) { + while (consumedResults.hasNext() && jobEdges.hasNext()) { Review Comment: IIUC, it is expected that `consumedResults.hasNext()` and `jobEdges.hasNext()` return the same value? If so, I would keep the original loop `for (DefaultLogicalResult consumedResult : consumedResults)` and only add the jobEdges iterator. Besides, it should check that the retrieved edge is the one corresponding to that result. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismAndInputInfosDeciderUtilsTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Test for {@link VertexParallelismAndInputInfosDeciderUtils}. */ +class VertexParallelismAndInputInfosDeciderUtilsTest { + @Test + void testCartesianProduct() { + // empty input + List<List<Integer>> inputEmpty = List.of(); + List<List<Integer>> expectedEmpty = List.of(List.of()); + List<List<Integer>> resultEmpty = + VertexParallelismAndInputInfosDeciderUtils.cartesianProduct(inputEmpty); + assertEquals(expectedEmpty, resultEmpty); + + // two lists + List<List<Integer>> inputTwo = Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)); + List<List<Integer>> expectedTwo = + Arrays.asList( + Arrays.asList(1, 3), + Arrays.asList(1, 4), + Arrays.asList(2, 3), + Arrays.asList(2, 4)); + List<List<Integer>> resultTwo = + VertexParallelismAndInputInfosDeciderUtils.cartesianProduct(inputTwo); + assertEquals(expectedTwo, resultTwo); + + // three lists + List<List<String>> inputThree = + Arrays.asList( + Arrays.asList("A", "B"), Arrays.asList("1", "2"), Arrays.asList("X", "Y")); + List<List<String>> expectedThree = + Arrays.asList( + Arrays.asList("A", "1", "X"), + Arrays.asList("A", "1", "Y"), + Arrays.asList("A", "2", "X"), + Arrays.asList("A", "2", "Y"), + Arrays.asList("B", "1", "X"), + Arrays.asList("B", "1", "Y"), + Arrays.asList("B", "2", "X"), + Arrays.asList("B", "2", "Y")); + List<List<String>> resultThree = + VertexParallelismAndInputInfosDeciderUtils.cartesianProduct(inputThree); + assertEquals(expectedThree, resultThree); + } + + @Test + void testMedian() { + long[] numsOdd = {5, 1, 3}; + long resultOdd = VertexParallelismAndInputInfosDeciderUtils.median(numsOdd); Review Comment: Maybe import the method to get rid of the redundant `VertexParallelismAndInputInfosDeciderUtils`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java: ########## @@ -45,10 +45,6 @@ public BlockingInputInfo( this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation; } - public BlockingResultInfo getConsumedResultInfo() { - return blockingResultInfo; - } Review Comment: This change should be in the commit in which `BlockingInputInfo.java` was introduced. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/AggregatedBlockingInputInfo.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch.util; + +import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.checkAndGetIntraCorrelation; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.computeSkewThreshold; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.computeTargetSize; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.getMaxNumPartitions; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.hasSameNumPartitions; +import static org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils.median; + +/** Helper class that provides information of aggregated input infos. */ +public class AggregatedBlockingInputInfo { + private final int maxPartitionNum; + + private final long skewedThreshold; + private final long targetSize; + + private final boolean hasSamePartitionNums; + private final boolean existIntraInputCorrelation; + + private final Map<Integer, long[]> subpartitionBytesByPartition; + private final long[] aggregatedSubpartitionBytes; Review Comment: Comments are needed for all these fields. I think the meaning of them are not straight forward for other developer. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java: ########## @@ -76,6 +76,10 @@ public class StreamEdge implements Serializable { private final IntermediateDataSetID intermediateDatasetIdToProduce; + private boolean existInterInputsKeyCorrelation; + + private boolean existIntraInputKeyCorrelation; Review Comment: Comments are needed to explain these two attributes, what does it mean and in which cases it will be set to true/false. And it's better to explain it also in `JobEdge`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +public class BlockingInputInfo implements BlockingResultInfo { + private final BlockingResultInfo blockingResultInfo; + private final int inputTypeNumber; + private final boolean existInterInputsKeyCorrelation; + private final boolean existIntraInputKeyCorrelation; + + public BlockingInputInfo( + BlockingResultInfo blockingResultInfo, + int inputTypeNumber, + boolean existInterInputsKeyCorrelation, + boolean existIntraInputKeyCorrelation) { + this.blockingResultInfo = blockingResultInfo; + this.inputTypeNumber = inputTypeNumber; + this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation; + this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation; + } + + public BlockingResultInfo getConsumedResultInfo() { + return blockingResultInfo; + } + + public int getInputTypeNumber() { + return inputTypeNumber; + } + + public boolean existIntraInputKeyCorrelation() { + return existIntraInputKeyCorrelation; + } + + public boolean existInterInputsKeyCorrelation() { + return existInterInputsKeyCorrelation; + } + + public List<Long> getAggregatedSubpartitionBytes() { + if (blockingResultInfo instanceof AllToAllBlockingResultInfo) { + return ((AllToAllBlockingResultInfo) blockingResultInfo) + .getAggregatedSubpartitionBytes(); + } + return Collections.emptyList(); + } + + @Override + public boolean isBroadcast() { + return blockingResultInfo.isBroadcast(); + } + + @Override + public boolean isPointwise() { + return blockingResultInfo.isPointwise(); + } + + @Override + public int getNumPartitions() { + return blockingResultInfo.getNumPartitions(); + } + + @Override + public int getNumSubpartitions(int partitionIndex) { + return blockingResultInfo.getNumSubpartitions(partitionIndex); + } + + @Override + public long getNumBytesProduced() { + return blockingResultInfo.getNumBytesProduced(); + } + + @Override + public long getNumBytesProduced( + IndexRange partitionIndexRange, IndexRange subpartitionIndexRange) { + long inputBytes = 0; Review Comment: Why it can't use that of the wrapped `blockingResultInfo`? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java: ########## @@ -293,21 +295,20 @@ void testHavePointwiseEdges() { parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList( new IndexRange(0, 1), - new IndexRange(2, 4), - new IndexRange(5, 6), - new IndexRange(7, 9))); - checkPointwiseJobVertexInputInfo( + new IndexRange(2, 5), Review Comment: Why is the result affected? Ideally the result of existing cases should remain the same. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java: ########## @@ -28,9 +28,7 @@ import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; - import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; - Review Comment: I guess this change is unexpected. It will violate the CI code-style check. ########## flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptionsInternal.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.Internal; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Internal configuration options for the batch job execution. */ +@Internal +public class BatchExecutionOptionsInternal { Review Comment: Is it necessary to add this class in `flink-core` module. Would it suffice to add it in `flink-runtime` module? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexInputInfoComputerTestUtil.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class VertexInputInfoComputerTestUtil { Review Comment: These UTs should be part of commit `Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider` ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingInputInfo.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +public class BlockingInputInfo implements BlockingResultInfo { + private final BlockingResultInfo blockingResultInfo; + private final int inputTypeNumber; + private final boolean existInterInputsKeyCorrelation; + private final boolean existIntraInputKeyCorrelation; + + public BlockingInputInfo( + BlockingResultInfo blockingResultInfo, + int inputTypeNumber, + boolean existInterInputsKeyCorrelation, + boolean existIntraInputKeyCorrelation) { + this.blockingResultInfo = blockingResultInfo; + this.inputTypeNumber = inputTypeNumber; + this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation; + this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation; + } + + public BlockingResultInfo getConsumedResultInfo() { + return blockingResultInfo; + } + + public int getInputTypeNumber() { + return inputTypeNumber; + } + + public boolean existIntraInputKeyCorrelation() { + return existIntraInputKeyCorrelation; + } + + public boolean existInterInputsKeyCorrelation() { + return existInterInputsKeyCorrelation; + } + + public List<Long> getAggregatedSubpartitionBytes() { + if (blockingResultInfo instanceof AllToAllBlockingResultInfo) { + return ((AllToAllBlockingResultInfo) blockingResultInfo) + .getAggregatedSubpartitionBytes(); + } + return Collections.emptyList(); Review Comment: If this method is only expected to be invoked on `AllToAllBlockingResultInfo`, exception should be thrown in other cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org