zentol commented on a change in pull request #15348:
URL: https://github.com/apache/flink/pull/15348#discussion_r603887025



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexAttemptNumberStore.java
##########
@@ -15,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.executiongraph;
+package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;

Review comment:
       did this move happen by accident?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultVertexParallelismStoreTest.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+

Review comment:
       ```suggestion
   
   /** Tests for the {@link DefaultVertexParallelismStore}. */
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultVertexAttemptNumberStore.java
##########
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.MutableVertexAttemptNumberStore;

Review comment:
       related to the presumed accidental move

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
##########
@@ -50,7 +55,16 @@ public JobGraphJobInformation(JobGraph jobGraph) {
 
     @Override
     public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
-        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+        return new JobVertexInformation(
+                jobGraph.findVertexByID(jobVertexId),
+                vertexParallelismStore
+                        .getParallelismInfo(jobVertexId)
+                        .orElseThrow(

Review comment:
       Is there actually any instances where an empty optional is handled 
without throwing an exception? If we don't have a use-case for that, then we 
could just fail right away in the parallelism store.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -251,9 +282,8 @@ public void 
testExecutionGraphGenerationWithAvailableResources() throws Exceptio
 
         final int numAvailableSlots = 1;
 
-        final OneShotLatch submitTaskLatch = new OneShotLatch();
-        final TaskManagerGateway taskManagerGateway =
-                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);

Review comment:
       Are we buffering more requests because reactive mode might scale things 
up?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+
+/** Test vertex parallelism configuration for the {@link AdaptiveScheduler} in 
Reactive mode. */
+@RunWith(Parameterized.class)
+public class AdaptiveSchedulerComputeReactiveModeVertexParallelismTest extends 
TestLogger {
+    @Parameterized.Parameters(
+            name =
+                    "parallelism = {0}, maxParallelism = {1}, expected max = 
{2}, rescale to = {3}, can rescale = {4}")
+    public static Object[][] data() {
+        return new Object[][] {
+            // default minimum and rescale to higher
+            {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 129, true},
+            // test round up part 1 and rescale to lower
+            {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, false},
+            // test round up part 2 and rescale to equal
+            {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 512, true},
+            // test round up limit and rescale to equal
+            {
+                UPPER_BOUND_MAX_PARALLELISM,
+                JobVertex.MAX_PARALLELISM_DEFAULT,
+                UPPER_BOUND_MAX_PARALLELISM,
+                UPPER_BOUND_MAX_PARALLELISM,
+                true
+            },
+            // test configured / takes precedence computed default and rescale 
to lower
+            {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, 
false},
+            // test override takes precedence test configured 2 and rescale to 
higher
+            {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, true},
+        };
+    }
+
+    @Parameterized.Parameter(0)
+    public int parallelism;
+
+    @Parameterized.Parameter(1)
+    public int maxParallelism;
+
+    @Parameterized.Parameter(2)
+    public int expectedMaxParallelism;
+
+    @Parameterized.Parameter(3)
+    public int maxToScaleTo;
+
+    @Parameterized.Parameter(4)
+    public boolean expectedCanRescaleTo;
+
+    @Test
+    public void testCreateStore() {

Review comment:
       ```suggestion
       public void testCreateStoreWithoutAdjustedParallelism() {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** A {@link VertexParallelismInformation} implementation that provides common 
validation. */
+public class DefaultVertexParallelismInfo implements 
VertexParallelismInformation {
+    private final int parallelism;
+    private int maxParallelism;
+    private final Function<Integer, Optional<String>> rescaleMaxValidator;
+
+    /**
+     * Create {@link VertexParallelismInformation} with max parallelism 
rescaling validation for a
+     * vertex.
+     *
+     * @param parallelism the vertex's parallelism
+     * @param maxParallelism the vertex's max parallelism
+     * @param rescaleMaxValidator the validation function to provide an error 
message if a max
+     *     parallelism rescale is not allowed
+     */
+    public DefaultVertexParallelismInfo(
+            int parallelism,
+            int maxParallelism,
+            Function<Integer, Optional<String>> rescaleMaxValidator) {
+        this.parallelism = checkParallelism(parallelism);
+        this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+        this.rescaleMaxValidator = 
Preconditions.checkNotNull(rescaleMaxValidator);
+    }
+
+    private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
+        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+        }
+
+        checkBounds("max parallelism", maxParallelism);
+        return maxParallelism;
+    }
+
+    private static int checkParallelism(int parallelism) {
+        checkBounds("parallelism", parallelism);
+        return parallelism;
+    }
+
+    private static void checkBounds(String name, int parallelism) {
+        Preconditions.checkArgument(
+                parallelism > 0
+                        && parallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                "Setting %s is not in valid bounds (1..%s), found: %s",
+                name,
+                KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                parallelism);
+    }

Review comment:
       ```suggestion
       return parallelism;
       }
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -302,23 +312,25 @@ public void testCannotConnectMissingId() throws Exception 
{
         v1.setParallelism(7);
         v1.setInvokableClass(AbstractInvokable.class);
 
-        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
+        // attach the second part of the graph

Review comment:
       needs to be moved down

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** A {@link VertexParallelismInformation} implementation that provides common 
validation. */
+public class DefaultVertexParallelismInfo implements 
VertexParallelismInformation {
+    private final int parallelism;
+    private int maxParallelism;
+    private final Function<Integer, Optional<String>> rescaleMaxValidator;
+
+    /**
+     * Create {@link VertexParallelismInformation} with max parallelism 
rescaling validation for a
+     * vertex.
+     *
+     * @param parallelism the vertex's parallelism
+     * @param maxParallelism the vertex's max parallelism
+     * @param rescaleMaxValidator the validation function to provide an error 
message if a max
+     *     parallelism rescale is not allowed
+     */
+    public DefaultVertexParallelismInfo(
+            int parallelism,
+            int maxParallelism,
+            Function<Integer, Optional<String>> rescaleMaxValidator) {
+        this.parallelism = checkParallelism(parallelism);
+        this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+        this.rescaleMaxValidator = 
Preconditions.checkNotNull(rescaleMaxValidator);
+    }
+
+    private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
+        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+        }
+
+        checkBounds("max parallelism", maxParallelism);
+        return maxParallelism;
+    }
+
+    private static int checkParallelism(int parallelism) {
+        checkBounds("parallelism", parallelism);
+        return parallelism;
+    }
+
+    private static void checkBounds(String name, int parallelism) {

Review comment:
       ```suggestion
       private static int checkBounds(String name, int parallelism) {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -237,16 +245,6 @@ public void testAttachViaIds() throws Exception {
         IntermediateDataSet v3result_2 =
                 v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 
-        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3));
-
-        ExecutionGraph eg = createDefaultExecutionGraph();
-        try {
-            eg.attachJobGraph(ordered);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
         // attach the second part of the graph

Review comment:
       this needs to be moved down

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultVertexParallelismInfoTest.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+

Review comment:
       ```suggestion
   
   /** Tests for the {@link DefaultVertexParallelismInfo}. */
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -270,6 +287,89 @@ private static void assertPreconditions(JobGraph jobGraph) 
throws RuntimeExcepti
         }
     }
 
+    /**
+     * Creates the parallelism store for a set of vertices, optionally with a 
flag to leave the
+     * vertex parallelism unchanged.
+     *
+     * <p>We need to set parallelism to the max possible value when requesting 
resources, but when
+     * executing the graph we should respect what we are actually given.
+     *
+     * @param vertices The vertices to store parallelism information for
+     * @param adjustParallelisms Whether to adjust the parallelisms
+     * @return The parallelism store.
+     */
+    @VisibleForTesting
+    static VertexParallelismStore computeReactiveModeVertexParallelismStore(
+            Iterable<JobVertex> vertices, boolean adjustParallelisms) {
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        for (JobVertex vertex : vertices) {
+            // if no max parallelism was configured by the user, we calculate 
and set a default
+            final int parallelism;
+            final int maxParallelism;
+            if (adjustParallelisms) {
+                maxParallelism =
+                        vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT
+                                ? 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(
+                                        vertex.getParallelism())
+                                : vertex.getMaxParallelism();
+                parallelism = maxParallelism;
+            } else {
+                parallelism = vertex.getParallelism();
+                maxParallelism = vertex.getMaxParallelism();
+            }
+
+            VertexParallelismInformation parallelismInfo =
+                    new DefaultVertexParallelismInfo(
+                            parallelism,
+                            maxParallelism,
+                            // Allow rescaling if the new desired max 
parallelism
+                            // is not less than what was declared here during 
scheduling.
+                            // This prevents the situation where more 
resources are requested
+                            // based on the computed default, when actually 
fewer are necessary.
+                            (newMax) ->
+                                    newMax >= maxParallelism
+                                            ? Optional.empty()
+                                            : Optional.of(
+                                                    "Cannot lower max 
parallelism in Reactive mode."));
+            store.setParallelismInfo(vertex.getID(), parallelismInfo);
+        }
+
+        return store;
+    }
+
+    /**
+     * Creates the parallelism store that should be used to build the {@link 
ExecutionGraph}, which
+     * will respect the vertex parallelism of the passed {@link JobGraph} in 
all modes.
+     *
+     * @param jobGraph The job graph for execution.
+     * @param executionMode The mode of scheduler execution.
+     * @return The parallelism store.
+     */
+    private static VertexParallelismStore computeVertexParallelismStore(
+            JobGraph jobGraph, SchedulerExecutionMode executionMode) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            return 
computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), true);
+        }
+        return SchedulerBase.computeVertexParallelismStore(jobGraph);
+    }
+
+    /**
+     * Creates the parallelism store that should be used to build the {@link 
ExecutionGraph}, which
+     * will respect the vertex parallelism of the passed {@link JobGraph} in 
all modes.

Review comment:
       from the javadocs the difference between these 2 methods does not become 
clear

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** A {@link VertexParallelismInformation} implementation that provides common 
validation. */
+public class DefaultVertexParallelismInfo implements 
VertexParallelismInformation {
+    private final int parallelism;
+    private int maxParallelism;
+    private final Function<Integer, Optional<String>> rescaleMaxValidator;
+
+    /**
+     * Create {@link VertexParallelismInformation} with max parallelism 
rescaling validation for a
+     * vertex.
+     *
+     * @param parallelism the vertex's parallelism
+     * @param maxParallelism the vertex's max parallelism
+     * @param rescaleMaxValidator the validation function to provide an error 
message if a max
+     *     parallelism rescale is not allowed
+     */
+    public DefaultVertexParallelismInfo(
+            int parallelism,
+            int maxParallelism,
+            Function<Integer, Optional<String>> rescaleMaxValidator) {
+        this.parallelism = checkParallelism(parallelism);
+        this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+        this.rescaleMaxValidator = 
Preconditions.checkNotNull(rescaleMaxValidator);
+    }
+
+    private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
+        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+        }
+
+        checkBounds("max parallelism", maxParallelism);
+        return maxParallelism;
+    }
+
+    private static int checkParallelism(int parallelism) {
+        checkBounds("parallelism", parallelism);
+        return parallelism;

Review comment:
       ```suggestion
           return checkBounds("parallelism", parallelism);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
##########
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExecutionJobVertexTest {
-
-    private static final int NOT_CONFIGURED = -1;
-
-    @Test
-    public void testMaxParallelismDefaulting() throws Exception {
-
-        // default minimum
-        ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, 
NOT_CONFIGURED);
-        Assert.assertEquals(128, executionJobVertex.getMaxParallelism());
-
-        // test round up part 1
-        executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED);
-        Assert.assertEquals(256, executionJobVertex.getMaxParallelism());
-
-        // test round up part 2
-        executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED);
-        Assert.assertEquals(512, executionJobVertex.getMaxParallelism());
-
-        // test round up limit
-        executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED);
-        Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
-
-        // test upper bound
-        try {
-            executionJobVertex = createExecutionJobVertex(1 + (1 << 15), 
NOT_CONFIGURED);
-            executionJobVertex.getMaxParallelism();
-            Assert.fail();
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // parallelism must be smaller than the max parallelism
-        try {
-            createExecutionJobVertex(172, 4);
-            Assert.fail(
-                    "We should not be able to create an ExecutionJobVertex 
which "
-                            + "has a smaller max parallelism than 
parallelism.");
-        } catch (JobException ignored) {
-            // expected
-        }
-
-        // test configured / trumps computed default
-        executionJobVertex = createExecutionJobVertex(4, 1 << 15);
-        Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
-
-        // test upper bound configured
-        try {
-            executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15));
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test lower bound configured
-        try {
-            executionJobVertex = createExecutionJobVertex(4, 0);
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test override trumps test configured 2
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        executionJobVertex.setMaxParallelism(7);
-        Assert.assertEquals(7, executionJobVertex.getMaxParallelism());
-
-        // test lower bound with derived value
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        try {
-            executionJobVertex.setMaxParallelism(0);
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test upper bound with derived value
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        try {
-            executionJobVertex.setMaxParallelism(1 + (1 << 15));
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test complain on setting derived value in presence of a configured 
value
-        executionJobVertex = createExecutionJobVertex(4, 16);
-        try {
-            executionJobVertex.setMaxParallelism(7);

Review comment:
       are we really still covering all these cases?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** A {@link VertexParallelismInformation} implementation that provides common 
validation. */
+public class DefaultVertexParallelismInfo implements 
VertexParallelismInformation {
+    private final int parallelism;
+    private int maxParallelism;
+    private final Function<Integer, Optional<String>> rescaleMaxValidator;
+
+    /**
+     * Create {@link VertexParallelismInformation} with max parallelism 
rescaling validation for a
+     * vertex.
+     *
+     * @param parallelism the vertex's parallelism
+     * @param maxParallelism the vertex's max parallelism
+     * @param rescaleMaxValidator the validation function to provide an error 
message if a max
+     *     parallelism rescale is not allowed
+     */
+    public DefaultVertexParallelismInfo(
+            int parallelism,
+            int maxParallelism,
+            Function<Integer, Optional<String>> rescaleMaxValidator) {
+        this.parallelism = checkParallelism(parallelism);
+        this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+        this.rescaleMaxValidator = 
Preconditions.checkNotNull(rescaleMaxValidator);
+    }
+
+    private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
+        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+        }
+
+        checkBounds("max parallelism", maxParallelism);
+        return maxParallelism;

Review comment:
       ```suggestion
           return checkBounds("max parallelism", maxParallelism);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -119,6 +123,21 @@
     public static final TestExecutorResource<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
             new 
TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
 
+    @Parameterized.Parameters(name = "execution mode = {0}")
+    public static Object[] data() {
+        return new Object[] {SchedulerExecutionMode.REACTIVE, null};

Review comment:
       I do wonder whether we really need to run all tests with reactive mode, 
or have one targeted tests to test that the right VertexParallelismStore is 
computed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
##########
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExecutionJobVertexTest {
-
-    private static final int NOT_CONFIGURED = -1;
-
-    @Test
-    public void testMaxParallelismDefaulting() throws Exception {
-
-        // default minimum
-        ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, 
NOT_CONFIGURED);
-        Assert.assertEquals(128, executionJobVertex.getMaxParallelism());
-
-        // test round up part 1
-        executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED);
-        Assert.assertEquals(256, executionJobVertex.getMaxParallelism());
-
-        // test round up part 2
-        executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED);
-        Assert.assertEquals(512, executionJobVertex.getMaxParallelism());
-
-        // test round up limit
-        executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED);
-        Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
-
-        // test upper bound
-        try {
-            executionJobVertex = createExecutionJobVertex(1 + (1 << 15), 
NOT_CONFIGURED);
-            executionJobVertex.getMaxParallelism();
-            Assert.fail();
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // parallelism must be smaller than the max parallelism
-        try {
-            createExecutionJobVertex(172, 4);

Review comment:
       We could keep this test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to