zhuzhurk commented on code in PR #25552: URL: https://github.com/apache/flink/pull/25552#discussion_r1903612414
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -128,12 +128,8 @@ public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( && vertexMaxParallelism >= vertexMinParallelism); if (consumedResults.isEmpty()) { - // source job vertex - int parallelism = - vertexInitialParallelism > 0 - ? vertexInitialParallelism - : computeSourceParallelismUpperBound(jobVertexId, vertexMaxParallelism); - return new ParallelismAndInputInfos(parallelism, Collections.emptyMap()); + return decideParallelismAndInputInfosForNonSource( Review Comment: This is the handling for sources. I mean for the handling of non-sources. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java: ########## @@ -181,18 +179,18 @@ public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( // we need to derive parallelism separately for each input. // // In the following cases, we need to reset min parallelism and max parallelism to ensure - // that the decide parallelism for all inputs is consistent : + // that the decided parallelism for all inputs is consistent : // 1. Vertex has a specified parallelism // 2. There are edges that don't need to follow intergroup constraint if (vertexInitialParallelism > 0 || inputsGroupByInterCorrelation.containsKey(false)) { minParallelism = parallelism; maxParallelism = parallelism; } - Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = new HashMap<>(); + Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new HashMap<>(); if (inputsGroupByInterCorrelation.containsKey(true)) { Review Comment: It looks a bit weird to me. Because inter/intra correlations and all-to-all/point-wise connections are not directly related. Looks to me it is based on some fragile assumption which can be changed in the future. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -525,7 +523,17 @@ private void maybeAggregateSubpartitionBytes(BlockingResultInfo resultInfo) { && intermediateResult.areAllConsumerVerticesCreated() && intermediateResult.getConsumerVertices().stream() .map(this::getExecutionJobVertex) - .allMatch(ExecutionJobVertex::isInitialized)) { + .allMatch(ExecutionJobVertex::isInitialized) + && intermediateResult.getConsumerVertices().stream() Review Comment: The method comment is not updated accordingly. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java: ########## @@ -77,6 +77,10 @@ public class StreamEdge implements Serializable { private final IntermediateDataSetID intermediateDatasetIdToProduce; + private boolean existInterInputsKeyCorrelation; + + private boolean existIntraInputKeyCorrelation; + Review Comment: Although it's not good to have one field and one method both named as `existInterInputsKeyCorrelation`, it's also a bit weird to directly name it as `interInputsKeyCorrelation`, given that it is actually not a `correlation`. Maybe `interInputsKeysCorrelated` and `areInterInputsKeysCorrelated()`? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java: ########## @@ -256,19 +264,22 @@ public String getEdgeId() { return edgeId; } - public boolean existInterInputsKeyCorrelation() { - return existInterInputsKeyCorrelation; + private void configureKeyCorrelation(StreamPartitioner<?> partitioner) { + this.intraInputKeyCorrelation = + !partitioner.isPointwise() || partitioner instanceof ForwardPartitioner; + this.interInputsKeyCorrelation = !partitioner.isPointwise(); } - public boolean existIntraInputKeyCorrelation() { - return existIntraInputKeyCorrelation; + public boolean existInterInputsKeyCorrelation() { + return interInputsKeyCorrelation; } - public void setExistInterInputsKeyCorrelation(boolean existInterInputsKeyCorrelation) { - this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation; + public boolean existIntraInputKeyCorrelation() { + return intraInputKeyCorrelation; } - public void setExistIntraInputKeyCorrelation(boolean existIntraInputKeyCorrelation) { - this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation; + public void setIntraInputKeyCorrelation(boolean intraInputKeyCorrelation) { + checkState(interInputsKeyCorrelation, "InterInputsKeyCorrelation must be true"); Review Comment: Why does it have such assumption? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java: ########## @@ -67,9 +67,19 @@ public class JobEdge implements java.io.Serializable { private final int typeNumber; - private final boolean existInterInputsKeyCorrelation; + /** + * There are relationships between multiple inputs, if the data corresponding to a specific join + * key from one input is split, the corresponding join key data from the other inputs must be + * duplicated (meaning that it must be sent to the downstream nodes where the split data is + * sent). + */ + private final boolean interInputsKeyCorrelation; - private final boolean existIntraInputKeyCorrelation; + /** + * For this edge the data corresponding to a specific join key must be sent to the same + * downstream subtask. + */ + private final boolean intraInputKeyCorrelation; Review Comment: Join is not the key point of these two correlations. Instead, runtime components do not understand the concept of a SQL join. The key point should be whether records with the same key are correlated and must be sent to the same downstream task to be processed together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org