zentol commented on a change in pull request #15348: URL: https://github.com/apache/flink/pull/15348#discussion_r602115950
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java ########## @@ -860,6 +902,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobG TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, initializationTimestamp, vertexAttemptNumberStore, + computeVertexParallelismStore(adjustedJobGraph), Review comment: hmm...if we compute the maxParallelism here, couldn't this result in us using different maxParallelism values for different execution attempts, and thus by extension, savepoints/checkpoints? Because the adjustedJobGraph may have a different parallelism each time. I think this is a problem because it turns loading a savepoint/checkpoint from this job into a guessing game of "what-max-parallelism-was-set-this-time". IOW, on every scale up the job may suddenly break, if the maxP ends up being higher than in the checkpoint we restore from. We can't base it on the original JobGraph, because there the parallelism is cranked to the max and we wouldn't be able to restore from a savepoint (and tank performance significantly). We also can't base it on the first execution, because in HA on JM failover we could again end up with a different maxParallelism... We may just have to force users to set the maxParallelism when using reactive mode. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java ########## @@ -270,6 +276,42 @@ private static void assertPreconditions(JobGraph jobGraph) throws RuntimeExcepti } } + @VisibleForTesting + static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices) { + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + for (JobVertex vertex : vertices) { + // if no max parallelism was configured by the user, we calculate and set a default + final int maxParallelism = + vertex.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT + ? KeyGroupRangeAssignment.computeDefaultMaxParallelism( + vertex.getParallelism()) + : vertex.getMaxParallelism(); + + VertexParallelismInformation parallelismInfo = + new DefaultVertexParallelismInfo( + maxParallelism, Review comment: The parallelism must only be set to the maxParallelism if reactive is enabled. The AdaptiveScheduler can also be used without it. Without reactive mode the AdaptiveScheduler _should_ need the same logic as the SchedulerBase. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java ########## @@ -213,7 +214,7 @@ private static CompletedCheckpointStorageLocation createSavepointWithOperatorSub .thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorId))); if (parallelism != maxParallelism) { - when(vertex.isMaxParallelismConfigured()).thenReturn(true); + when(vertex.canRescaleMaxParallelism(anyInt())).thenReturn(true); Review comment: shouldn't this return false, given that the conditions in the StateAssignmentOperation/Checkpoints have been inversed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org