zentol commented on a change in pull request #15348:
URL: https://github.com/apache/flink/pull/15348#discussion_r602115950



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -860,6 +902,7 @@ private ExecutionGraph 
createExecutionGraphAndRestoreState(JobGraph adjustedJobG
                 
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
                 initializationTimestamp,
                 vertexAttemptNumberStore,
+                computeVertexParallelismStore(adjustedJobGraph),

Review comment:
       hmm...if we compute the maxParallelism here, couldn't this result in us 
using different maxParallelism values for different execution attempts, and 
thus by extension, savepoints/checkpoints? Because the adjustedJobGraph may 
have a different parallelism each time.
   I think this is a problem because it turns loading a savepoint/checkpoint 
from this job into a guessing game of "what-max-parallelism-was-set-this-time".
   IOW, on every scale up the job may suddenly break, if the maxP ends up being 
higher than in the checkpoint we restore from.
   
   We can't base it on the original JobGraph, because there the parallelism is 
cranked to the max and we wouldn't be able to restore from a savepoint (and 
tank performance significantly).
   We also can't base it on the first execution, because in HA on JM failover 
we could again end up with a different maxParallelism...
   We may just have to force users to set the maxParallelism when using 
reactive mode.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -270,6 +276,42 @@ private static void assertPreconditions(JobGraph jobGraph) 
throws RuntimeExcepti
         }
     }
 
+    @VisibleForTesting
+    static VertexParallelismStore 
computeVertexParallelismStore(Iterable<JobVertex> vertices) {
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        for (JobVertex vertex : vertices) {
+            // if no max parallelism was configured by the user, we calculate 
and set a default
+            final int maxParallelism =
+                    vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT
+                            ? 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(
+                                    vertex.getParallelism())
+                            : vertex.getMaxParallelism();
+
+            VertexParallelismInformation parallelismInfo =
+                    new DefaultVertexParallelismInfo(
+                            maxParallelism,

Review comment:
       The parallelism must only be set to the maxParallelism if reactive is 
enabled. The AdaptiveScheduler can also be used without it.
   Without reactive mode the AdaptiveScheduler _should_ need the same logic as 
the SchedulerBase.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
##########
@@ -213,7 +214,7 @@ private static CompletedCheckpointStorageLocation 
createSavepointWithOperatorSub
                 
.thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorId)));
 
         if (parallelism != maxParallelism) {
-            when(vertex.isMaxParallelismConfigured()).thenReturn(true);
+            when(vertex.canRescaleMaxParallelism(anyInt())).thenReturn(true);

Review comment:
       shouldn't this return false, given that the conditions in the 
StateAssignmentOperation/Checkpoints have been inversed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to