zhuzhurk commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1903612414


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -128,12 +128,8 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
                         && vertexMaxParallelism >= vertexMinParallelism);
 
         if (consumedResults.isEmpty()) {
-            // source job vertex
-            int parallelism =
-                    vertexInitialParallelism > 0
-                            ? vertexInitialParallelism
-                            : computeSourceParallelismUpperBound(jobVertexId, 
vertexMaxParallelism);
-            return new ParallelismAndInputInfos(parallelism, 
Collections.emptyMap());
+            return decideParallelismAndInputInfosForNonSource(

Review Comment:
   This is the handling for sources. I mean for the handling of non-sources.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -181,18 +179,18 @@ public ParallelismAndInputInfos 
decideParallelismAndInputInfosForVertex(
         // we need to derive parallelism separately for each input.
         //
         // In the following cases, we need to reset min parallelism and max 
parallelism to ensure
-        // that the decide parallelism for all inputs is consistent :
+        // that the decided parallelism for all inputs is consistent :
         // 1.  Vertex has a specified parallelism
         // 2.  There are edges that don't need to follow intergroup constraint
         if (vertexInitialParallelism > 0 || 
inputsGroupByInterCorrelation.containsKey(false)) {
             minParallelism = parallelism;
             maxParallelism = parallelism;
         }
 
-        Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfoMap = 
new HashMap<>();
+        Map<IntermediateDataSetID, JobVertexInputInfo> vertexInputInfos = new 
HashMap<>();
 
         if (inputsGroupByInterCorrelation.containsKey(true)) {

Review Comment:
   It looks a bit weird to me. Because inter/intra correlations and 
all-to-all/point-wise connections are not directly related.
   Looks to me it is based on some fragile assumption which can be changed in 
the future.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -525,7 +523,17 @@ private void 
maybeAggregateSubpartitionBytes(BlockingResultInfo resultInfo) {
                 && intermediateResult.areAllConsumerVerticesCreated()
                 && intermediateResult.getConsumerVertices().stream()
                         .map(this::getExecutionJobVertex)
-                        .allMatch(ExecutionJobVertex::isInitialized)) {
+                        .allMatch(ExecutionJobVertex::isInitialized)
+                && intermediateResult.getConsumerVertices().stream()

Review Comment:
   The method comment is not updated accordingly.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -77,6 +77,10 @@ public class StreamEdge implements Serializable {
 
     private final IntermediateDataSetID intermediateDatasetIdToProduce;
 
+    private boolean existInterInputsKeyCorrelation;
+
+    private boolean existIntraInputKeyCorrelation;
+

Review Comment:
   Although it's not good to have one field and one method both named as 
`existInterInputsKeyCorrelation`, it's also a bit weird to directly name it as 
`interInputsKeyCorrelation`, given that it is actually not a `correlation`.
   Maybe `interInputsKeysCorrelated` and `areInterInputsKeysCorrelated()`?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -256,19 +264,22 @@ public String getEdgeId() {
         return edgeId;
     }
 
-    public boolean existInterInputsKeyCorrelation() {
-        return existInterInputsKeyCorrelation;
+    private void configureKeyCorrelation(StreamPartitioner<?> partitioner) {
+        this.intraInputKeyCorrelation =
+                !partitioner.isPointwise() || partitioner instanceof 
ForwardPartitioner;
+        this.interInputsKeyCorrelation = !partitioner.isPointwise();
     }
 
-    public boolean existIntraInputKeyCorrelation() {
-        return existIntraInputKeyCorrelation;
+    public boolean existInterInputsKeyCorrelation() {
+        return interInputsKeyCorrelation;
     }
 
-    public void setExistInterInputsKeyCorrelation(boolean 
existInterInputsKeyCorrelation) {
-        this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation;
+    public boolean existIntraInputKeyCorrelation() {
+        return intraInputKeyCorrelation;
     }
 
-    public void setExistIntraInputKeyCorrelation(boolean 
existIntraInputKeyCorrelation) {
-        this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation;
+    public void setIntraInputKeyCorrelation(boolean intraInputKeyCorrelation) {
+        checkState(interInputsKeyCorrelation, "InterInputsKeyCorrelation must 
be true");

Review Comment:
   Why does it have such assumption?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -67,9 +67,19 @@ public class JobEdge implements java.io.Serializable {
 
     private final int typeNumber;
 
-    private final boolean existInterInputsKeyCorrelation;
+    /**
+     * There are relationships between multiple inputs, if the data 
corresponding to a specific join
+     * key from one input is split, the corresponding join key data from the 
other inputs must be
+     * duplicated (meaning that it must be sent to the downstream nodes where 
the split data is
+     * sent).
+     */
+    private final boolean interInputsKeyCorrelation;
 
-    private final boolean existIntraInputKeyCorrelation;
+    /**
+     * For this edge the data corresponding to a specific join key must be 
sent to the same
+     * downstream subtask.
+     */
+    private final boolean intraInputKeyCorrelation;

Review Comment:
   Join is not the key point of these two correlations. Instead, runtime 
components do not understand the concept of a SQL join.
   The key point should be whether records with the same key are correlated and 
must be sent to the same downstream task to be processed together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to