zhuzhurk commented on code in PR #21162: URL: https://github.com/apache/flink/pull/21162#discussion_r1049598360
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SubpartitionIndexRange.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +/** This class represents the range of subpartition index. The range is inclusive. */ +public class SubpartitionIndexRange extends IndexRange { + + public SubpartitionIndexRange(int startIndex, int endIndex) { Review Comment: Can we just use IndexRange without introducing PartitionIndexRange/SubpartitionIndexRange? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class describe the inputs(partitions and subpartitions that belong to the same intermediate + * result) information of a subtask. + */ +public class TaskInputInfo { Review Comment: Compared to TaskInputInfo & VertexInputInfo, ExecutionVertexInputInfo & JobVertexInputInfo may be easier to understand. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java: ########## @@ -120,4 +120,6 @@ void notifySchedulerNgAboutInternalTaskFailure( /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( IntermediateDataSetID intermediateResultPartition); + + VertexInputInfo getVertexInputInfo(JobVertexID jobVertexId, IntermediateDataSetID resultId); Review Comment: A java doc is needed. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoStore.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A store contains all the {@link VertexInputInfo}s. */ +public class VertexInputInfoStore { + + private final Map<JobVertexID, Map<IntermediateDataSetID, VertexInputInfo>> vertexInputInfos = + new HashMap<>(); + + /** + * Put a {@link VertexInputInfo}. + * + * @param jobVertexId the job vertex id + * @param resultId the intermediate result id + * @param info the {@link VertexInputInfo} to put + */ + public void put(JobVertexID jobVertexId, IntermediateDataSetID resultId, VertexInputInfo info) { + checkNotNull(jobVertexId); + checkNotNull(resultId); + checkNotNull(info); + + vertexInputInfos.compute( + jobVertexId, + (ignored, inputInfos) -> { + if (inputInfos == null) { + inputInfos = new HashMap<>(); + } + + // Note that a job vertex can have 2 inputs with the same IntermediateDataSetID Review Comment: > a job vertex can have 2 inputs with the same IntermediateDataSetID What's the point of adding the comment here? Should we add it to the get method and also explain that, if a vertex has multiple job edges connecting to the same result, their distribution pattern must be the same and therefore the VertexInputInfo will be the same? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultInfo.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +public interface IntermediateResultInfo { + /** + * Get the intermediate result id. + * + * @return the intermediate result id + */ + IntermediateDataSetID getResultId(); + + /** + * Whether it is a broadcast result. + * + * @return whether it is a broadcast result + */ + boolean isBroadcast(); + + /** + * Whether it is a pointwise result. + * + * @return whether it is a pointwise result + */ + boolean isPointwise(); + + /** + * Get number of partitions for this result. + * + * @return the number of partitions in this result + */ + int getNumPartitions(); + + /** + * Get number of subpartitions for the given partition. + * + * @param partitionIndex the partition index + * @return the number of partitions of the partition Review Comment: partitions of -> subpartitions of ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** Util to compute {@link VertexInputInfo}s for execution job vertex. */ +public class VertexInputInfoComputationUtils { + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + ExecutionJobVertex ejv, + Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) { + checkState(ejv.isParallelismDecided()); + final List<IntermediateResultInfo> intermediateResultInfos = + ejv.getJobVertex().getInputs().stream() + .map(JobEdge::getSourceId) + .map(intermediateResultRetriever) + .map(IntermediateResultWrapper::new) + .collect(Collectors.toList()); + return computeVertexInputInfos( + ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic()); + } + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) { + + checkArgument(parallelism > 0); + final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>(); + + for (IntermediateResultInfo input : inputs) { + int sourceParallelism = input.getNumPartitions(); + + if (input.isPointwise()) { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForPointwise( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph)); + } else { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForAllToAll( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph, + input.isBroadcast())); + } + } + + return vertexInputInfos; + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForPointwise( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph) { + + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + + if (sourceCount >= targetCount) { + for (int index = 0; index < targetCount; index++) { + + int start = index * sourceCount / targetCount; + int end = (index + 1) * sourceCount / targetCount; + + PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + index, + 1, + numOfSubpartitionsRetriever.apply(start), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange)); + } + } else { + for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) { + + int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount; + int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount; + int numConsumers = end - start; + + for (int i = start; i < end; i++) { + PartitionIndexRange partitionRange = + new PartitionIndexRange(partitionNum, partitionNum); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + i, + numConsumers, + numOfSubpartitionsRetriever.apply(partitionNum), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange)); + } + } + } + return new VertexInputInfo(taskInputInfos); + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @param isBroadcast whether the edge is broadcast + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForAllToAll( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph, + boolean isBroadcast) { + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + for (int i = 0; i < targetCount; ++i) { + PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + i, + targetCount, + numOfSubpartitionsRetriever.apply(0), + isDynamicGraph, + isBroadcast); + taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange)); + } + return new VertexInputInfo(taskInputInfos); + } + + /** + * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly + * distribute subpartitions to downstream subtasks according to the number of subpartitions. + * Different downstream subtasks consume roughly the same number of subpartitions. + * + * @param consumerSubtaskIndex the subtask index + * @param numConsumers the total number of consumers + * @param numSubpartitions the total number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @param isBroadcast whether the edge is broadcast + * @return the computed subpartition range + */ + @VisibleForTesting + static SubpartitionIndexRange computeConsumedSubpartitionRange( + int consumerSubtaskIndex, + int numConsumers, + int numSubpartitions, + boolean isDynamicGraph, + boolean isBroadcast) { + int consumerIndex = consumerSubtaskIndex % numConsumers; + if (!isDynamicGraph) { + return new SubpartitionIndexRange(consumerIndex, consumerIndex); + } else { + if (isBroadcast) { + // broadcast results have only one subpartition, and be consumed multiple times. + checkArgument(numSubpartitions == 1); + return new SubpartitionIndexRange(0, 0); + } else { + checkArgument(consumerIndex < numConsumers); + checkArgument(numConsumers <= numSubpartitions); + + int start = consumerIndex * numSubpartitions / numConsumers; + int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers; + + return new SubpartitionIndexRange(start, nextStart - 1); + } + } + } + + private static class IntermediateResultWrapper implements IntermediateResultInfo { + private final IntermediateResult intermediateResult; + + IntermediateResultWrapper(IntermediateResult intermediateResult) { + this.intermediateResult = checkNotNull(intermediateResult); + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateResult.getId(); + } + + @Override + public boolean isBroadcast() { + return intermediateResult.isBroadcast(); + } + + @Override + public boolean isPointwise() { + return intermediateResult.getConsumingDistributionPattern() + == DistributionPattern.POINTWISE; + } + + @Override + public int getNumPartitions() { + return intermediateResult.getNumberOfAssignedPartitions(); + } + + @Override + public int getNumSubpartitions(int partitionIndex) { + boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic(); + // Note that for non-dynamic graph, the num of subpartition has not been decided at Review Comment: non-dynamic -> dynamic When connecting to a result, the result should have been fully produced. Why is the `numberOfSubpartitions` not decided yet? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class describe the inputs(partitions and subpartitions that belong to the same intermediate + * result) information of a subtask. + */ +public class TaskInputInfo { + private final int consumerIndex; + + private final PartitionIndexRange partitionIndexRange; + + private final SubpartitionIndexRange subpartitionIndexRange; + + public TaskInputInfo( + final int consumerIndex, + final PartitionIndexRange partitionIndexRange, + final SubpartitionIndexRange subpartitionIndexRange) { + this.consumerIndex = consumerIndex; + this.partitionIndexRange = checkNotNull(partitionIndexRange); + this.subpartitionIndexRange = checkNotNull(subpartitionIndexRange); + } + + /** Get the subpartition range this subtask should consume. */ + public SubpartitionIndexRange getSubpartitionIndexRange() { + return checkNotNull(subpartitionIndexRange); Review Comment: No need to do `checkNotNull` because it's already checked in the constructor and the field is immutable. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class describe the inputs(partitions and subpartitions that belong to the same intermediate + * result) information of a subtask. + */ +public class TaskInputInfo { + private final int consumerIndex; Review Comment: -> subtaskIndex I think it's easier to understand. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** Util to compute {@link VertexInputInfo}s for execution job vertex. */ +public class VertexInputInfoComputationUtils { + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + ExecutionJobVertex ejv, + Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) { + checkState(ejv.isParallelismDecided()); + final List<IntermediateResultInfo> intermediateResultInfos = + ejv.getJobVertex().getInputs().stream() + .map(JobEdge::getSourceId) + .map(intermediateResultRetriever) + .map(IntermediateResultWrapper::new) + .collect(Collectors.toList()); + return computeVertexInputInfos( + ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic()); + } + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) { + + checkArgument(parallelism > 0); + final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>(); + + for (IntermediateResultInfo input : inputs) { + int sourceParallelism = input.getNumPartitions(); + + if (input.isPointwise()) { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForPointwise( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph)); + } else { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForAllToAll( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph, + input.isBroadcast())); + } + } + + return vertexInputInfos; + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForPointwise( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph) { + + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + + if (sourceCount >= targetCount) { + for (int index = 0; index < targetCount; index++) { + + int start = index * sourceCount / targetCount; + int end = (index + 1) * sourceCount / targetCount; + + PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + index, + 1, + numOfSubpartitionsRetriever.apply(start), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange)); + } + } else { + for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) { + + int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount; + int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount; + int numConsumers = end - start; + + for (int i = start; i < end; i++) { + PartitionIndexRange partitionRange = + new PartitionIndexRange(partitionNum, partitionNum); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + i, + numConsumers, + numOfSubpartitionsRetriever.apply(partitionNum), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange)); + } + } + } + return new VertexInputInfo(taskInputInfos); + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @param isBroadcast whether the edge is broadcast + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForAllToAll( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph, + boolean isBroadcast) { + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + for (int i = 0; i < targetCount; ++i) { + PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + i, + targetCount, + numOfSubpartitionsRetriever.apply(0), + isDynamicGraph, + isBroadcast); + taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange)); + } + return new VertexInputInfo(taskInputInfos); + } + + /** + * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly + * distribute subpartitions to downstream subtasks according to the number of subpartitions. + * Different downstream subtasks consume roughly the same number of subpartitions. + * + * @param consumerSubtaskIndex the subtask index + * @param numConsumers the total number of consumers + * @param numSubpartitions the total number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @param isBroadcast whether the edge is broadcast + * @return the computed subpartition range + */ + @VisibleForTesting + static SubpartitionIndexRange computeConsumedSubpartitionRange( + int consumerSubtaskIndex, + int numConsumers, + int numSubpartitions, + boolean isDynamicGraph, + boolean isBroadcast) { + int consumerIndex = consumerSubtaskIndex % numConsumers; + if (!isDynamicGraph) { + return new SubpartitionIndexRange(consumerIndex, consumerIndex); + } else { + if (isBroadcast) { + // broadcast results have only one subpartition, and be consumed multiple times. + checkArgument(numSubpartitions == 1); + return new SubpartitionIndexRange(0, 0); + } else { + checkArgument(consumerIndex < numConsumers); + checkArgument(numConsumers <= numSubpartitions); + + int start = consumerIndex * numSubpartitions / numConsumers; + int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers; + + return new SubpartitionIndexRange(start, nextStart - 1); + } + } + } + + private static class IntermediateResultWrapper implements IntermediateResultInfo { + private final IntermediateResult intermediateResult; + + IntermediateResultWrapper(IntermediateResult intermediateResult) { + this.intermediateResult = checkNotNull(intermediateResult); + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateResult.getId(); + } + + @Override + public boolean isBroadcast() { + return intermediateResult.isBroadcast(); + } + + @Override + public boolean isPointwise() { + return intermediateResult.getConsumingDistributionPattern() + == DistributionPattern.POINTWISE; + } + + @Override + public int getNumPartitions() { + return intermediateResult.getNumberOfAssignedPartitions(); + } + + @Override + public int getNumSubpartitions(int partitionIndex) { + boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic(); + // Note that for non-dynamic graph, the num of subpartition has not been decided at + // this time + return isDynamicGraph + ? intermediateResult.getPartitions()[partitionIndex].getNumberOfSubpartitions() + : IntermediateResultPartition.UNKNOWN; Review Comment: IntermediateResultPartition.UNKNOWN is confusing. It's better to rename it as NUM_SUBPARTITIONS_UNKNOWN. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** Util to compute {@link VertexInputInfo}s for execution job vertex. */ +public class VertexInputInfoComputationUtils { + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + ExecutionJobVertex ejv, + Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) { + checkState(ejv.isParallelismDecided()); + final List<IntermediateResultInfo> intermediateResultInfos = + ejv.getJobVertex().getInputs().stream() + .map(JobEdge::getSourceId) + .map(intermediateResultRetriever) + .map(IntermediateResultWrapper::new) + .collect(Collectors.toList()); + return computeVertexInputInfos( + ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic()); + } + + public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos( + int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) { + + checkArgument(parallelism > 0); + final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>(); + + for (IntermediateResultInfo input : inputs) { + int sourceParallelism = input.getNumPartitions(); + + if (input.isPointwise()) { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForPointwise( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph)); + } else { + vertexInputInfos.putIfAbsent( + input.getResultId(), + computeVertexInputInfoForAllToAll( + sourceParallelism, + parallelism, + input::getNumSubpartitions, + isDynamicGraph, + input.isBroadcast())); + } + } + + return vertexInputInfos; + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForPointwise( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph) { + + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + + if (sourceCount >= targetCount) { + for (int index = 0; index < targetCount; index++) { + + int start = index * sourceCount / targetCount; + int end = (index + 1) * sourceCount / targetCount; + + PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + index, + 1, + numOfSubpartitionsRetriever.apply(start), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange)); + } + } else { + for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) { + + int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount; + int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount; + int numConsumers = end - start; + + for (int i = start; i < end; i++) { + PartitionIndexRange partitionRange = + new PartitionIndexRange(partitionNum, partitionNum); + SubpartitionIndexRange subpartitionRange = + computeConsumedSubpartitionRange( + i, + numConsumers, + numOfSubpartitionsRetriever.apply(partitionNum), + isDynamicGraph, + false); + taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange)); + } + } + } + return new VertexInputInfo(taskInputInfos); + } + + /** + * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This + * computation algorithm will evenly distribute subpartitions to downstream subtasks according + * to the number of subpartitions. Different downstream subtasks consume roughly the same number + * of subpartitions. + * + * @param sourceCount the parallelism of upstream + * @param targetCount the parallelism of downstream + * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions + * @param isDynamicGraph whether is dynamic graph + * @param isBroadcast whether the edge is broadcast + * @return the computed {@link VertexInputInfo} + */ + @VisibleForTesting + static VertexInputInfo computeVertexInputInfoForAllToAll( + int sourceCount, + int targetCount, + Function<Integer, Integer> numOfSubpartitionsRetriever, + boolean isDynamicGraph, + boolean isBroadcast) { + final List<TaskInputInfo> taskInputInfos = new ArrayList<>(); + for (int i = 0; i < targetCount; ++i) { + PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1); Review Comment: The `partitionRange` can be created only once and reused in the loop. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class describe the inputs(partitions and subpartitions that belong to the same intermediate + * result) information of a subtask. + */ +public class TaskInputInfo { + private final int consumerIndex; + + private final PartitionIndexRange partitionIndexRange; + + private final SubpartitionIndexRange subpartitionIndexRange; + + public TaskInputInfo( + final int consumerIndex, + final PartitionIndexRange partitionIndexRange, + final SubpartitionIndexRange subpartitionIndexRange) { + this.consumerIndex = consumerIndex; + this.partitionIndexRange = checkNotNull(partitionIndexRange); + this.subpartitionIndexRange = checkNotNull(subpartitionIndexRange); + } + + /** Get the subpartition range this subtask should consume. */ + public SubpartitionIndexRange getSubpartitionIndexRange() { + return checkNotNull(subpartitionIndexRange); + } + + /** Get the partition range this subtask should consume. */ + public PartitionIndexRange getPartitionIndexRange() { + return checkNotNull(partitionIndexRange); Review Comment: No need to do checkNotNull because it's already checked in the constructor and the field is immutable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org