zentol commented on a change in pull request #15348: URL: https://github.com/apache/flink/pull/15348#discussion_r603887025
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexAttemptNumberStore.java ########## @@ -15,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.executiongraph; +package org.apache.flink.runtime.scheduler; +import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; Review comment: did this move happen by accident? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultVertexParallelismStoreTest.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + Review comment: ```suggestion /** Tests for the {@link DefaultVertexParallelismStore}. */ ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultVertexAttemptNumberStore.java ########## @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.MutableVertexAttemptNumberStore; Review comment: related to the presumed accidental move ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java ########## @@ -50,7 +55,16 @@ public JobGraphJobInformation(JobGraph jobGraph) { @Override public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) { - return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId)); + return new JobVertexInformation( + jobGraph.findVertexByID(jobVertexId), + vertexParallelismStore + .getParallelismInfo(jobVertexId) + .orElseThrow( Review comment: Is there actually any instances where an empty optional is handled without throwing an exception? If we don't have a use-case for that, then we could just fail right away in the parallelism store. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java ########## @@ -251,9 +282,8 @@ public void testExecutionGraphGenerationWithAvailableResources() throws Exceptio final int numAvailableSlots = 1; - final OneShotLatch submitTaskLatch = new OneShotLatch(); - final TaskManagerGateway taskManagerGateway = - createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(numAvailableSlots); Review comment: Are we buffering more requests because reactive mode might scale things up? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collections; +import java.util.Optional; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + +/** Test vertex parallelism configuration for the {@link AdaptiveScheduler} in Reactive mode. */ +@RunWith(Parameterized.class) +public class AdaptiveSchedulerComputeReactiveModeVertexParallelismTest extends TestLogger { + @Parameterized.Parameters( + name = + "parallelism = {0}, maxParallelism = {1}, expected max = {2}, rescale to = {3}, can rescale = {4}") + public static Object[][] data() { + return new Object[][] { + // default minimum and rescale to higher + {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 129, true}, + // test round up part 1 and rescale to lower + {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, false}, + // test round up part 2 and rescale to equal + {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 512, true}, + // test round up limit and rescale to equal + { + UPPER_BOUND_MAX_PARALLELISM, + JobVertex.MAX_PARALLELISM_DEFAULT, + UPPER_BOUND_MAX_PARALLELISM, + UPPER_BOUND_MAX_PARALLELISM, + true + }, + // test configured / takes precedence computed default and rescale to lower + {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, false}, + // test override takes precedence test configured 2 and rescale to higher + {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, true}, + }; + } + + @Parameterized.Parameter(0) + public int parallelism; + + @Parameterized.Parameter(1) + public int maxParallelism; + + @Parameterized.Parameter(2) + public int expectedMaxParallelism; + + @Parameterized.Parameter(3) + public int maxToScaleTo; + + @Parameterized.Parameter(4) + public boolean expectedCanRescaleTo; + + @Test + public void testCreateStore() { Review comment: ```suggestion public void testCreateStoreWithoutAdjustedParallelism() { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; +import java.util.function.Function; + +/** A {@link VertexParallelismInformation} implementation that provides common validation. */ +public class DefaultVertexParallelismInfo implements VertexParallelismInformation { + private final int parallelism; + private int maxParallelism; + private final Function<Integer, Optional<String>> rescaleMaxValidator; + + /** + * Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a + * vertex. + * + * @param parallelism the vertex's parallelism + * @param maxParallelism the vertex's max parallelism + * @param rescaleMaxValidator the validation function to provide an error message if a max + * parallelism rescale is not allowed + */ + public DefaultVertexParallelismInfo( + int parallelism, + int maxParallelism, + Function<Integer, Optional<String>> rescaleMaxValidator) { + this.parallelism = checkParallelism(parallelism); + this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator); + } + + private static int normalizeAndCheckMaxParallelism(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + } + + checkBounds("max parallelism", maxParallelism); + return maxParallelism; + } + + private static int checkParallelism(int parallelism) { + checkBounds("parallelism", parallelism); + return parallelism; + } + + private static void checkBounds(String name, int parallelism) { + Preconditions.checkArgument( + parallelism > 0 + && parallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "Setting %s is not in valid bounds (1..%s), found: %s", + name, + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + parallelism); + } Review comment: ```suggestion return parallelism; } ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java ########## @@ -302,23 +312,25 @@ public void testCannotConnectMissingId() throws Exception { v1.setParallelism(7); v1.setInvokableClass(AbstractInvokable.class); - List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1)); + // attach the second part of the graph Review comment: needs to be moved down ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; +import java.util.function.Function; + +/** A {@link VertexParallelismInformation} implementation that provides common validation. */ +public class DefaultVertexParallelismInfo implements VertexParallelismInformation { + private final int parallelism; + private int maxParallelism; + private final Function<Integer, Optional<String>> rescaleMaxValidator; + + /** + * Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a + * vertex. + * + * @param parallelism the vertex's parallelism + * @param maxParallelism the vertex's max parallelism + * @param rescaleMaxValidator the validation function to provide an error message if a max + * parallelism rescale is not allowed + */ + public DefaultVertexParallelismInfo( + int parallelism, + int maxParallelism, + Function<Integer, Optional<String>> rescaleMaxValidator) { + this.parallelism = checkParallelism(parallelism); + this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator); + } + + private static int normalizeAndCheckMaxParallelism(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + } + + checkBounds("max parallelism", maxParallelism); + return maxParallelism; + } + + private static int checkParallelism(int parallelism) { + checkBounds("parallelism", parallelism); + return parallelism; + } + + private static void checkBounds(String name, int parallelism) { Review comment: ```suggestion private static int checkBounds(String name, int parallelism) { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java ########## @@ -237,16 +245,6 @@ public void testAttachViaIds() throws Exception { IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED); - List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); - - ExecutionGraph eg = createDefaultExecutionGraph(); - try { - eg.attachJobGraph(ordered); - } catch (JobException e) { - e.printStackTrace(); - fail("Job failed with exception: " + e.getMessage()); - } - // attach the second part of the graph Review comment: this needs to be moved down ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultVertexParallelismInfoTest.java ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; + Review comment: ```suggestion /** Tests for the {@link DefaultVertexParallelismInfo}. */ ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java ########## @@ -270,6 +287,89 @@ private static void assertPreconditions(JobGraph jobGraph) throws RuntimeExcepti } } + /** + * Creates the parallelism store for a set of vertices, optionally with a flag to leave the + * vertex parallelism unchanged. + * + * <p>We need to set parallelism to the max possible value when requesting resources, but when + * executing the graph we should respect what we are actually given. + * + * @param vertices The vertices to store parallelism information for + * @param adjustParallelisms Whether to adjust the parallelisms + * @return The parallelism store. + */ + @VisibleForTesting + static VertexParallelismStore computeReactiveModeVertexParallelismStore( + Iterable<JobVertex> vertices, boolean adjustParallelisms) { + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + for (JobVertex vertex : vertices) { + // if no max parallelism was configured by the user, we calculate and set a default + final int parallelism; + final int maxParallelism; + if (adjustParallelisms) { + maxParallelism = + vertex.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT + ? KeyGroupRangeAssignment.computeDefaultMaxParallelism( + vertex.getParallelism()) + : vertex.getMaxParallelism(); + parallelism = maxParallelism; + } else { + parallelism = vertex.getParallelism(); + maxParallelism = vertex.getMaxParallelism(); + } + + VertexParallelismInformation parallelismInfo = + new DefaultVertexParallelismInfo( + parallelism, + maxParallelism, + // Allow rescaling if the new desired max parallelism + // is not less than what was declared here during scheduling. + // This prevents the situation where more resources are requested + // based on the computed default, when actually fewer are necessary. + (newMax) -> + newMax >= maxParallelism + ? Optional.empty() + : Optional.of( + "Cannot lower max parallelism in Reactive mode.")); + store.setParallelismInfo(vertex.getID(), parallelismInfo); + } + + return store; + } + + /** + * Creates the parallelism store that should be used to build the {@link ExecutionGraph}, which + * will respect the vertex parallelism of the passed {@link JobGraph} in all modes. + * + * @param jobGraph The job graph for execution. + * @param executionMode The mode of scheduler execution. + * @return The parallelism store. + */ + private static VertexParallelismStore computeVertexParallelismStore( + JobGraph jobGraph, SchedulerExecutionMode executionMode) { + if (executionMode == SchedulerExecutionMode.REACTIVE) { + return computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), true); + } + return SchedulerBase.computeVertexParallelismStore(jobGraph); + } + + /** + * Creates the parallelism store that should be used to build the {@link ExecutionGraph}, which + * will respect the vertex parallelism of the passed {@link JobGraph} in all modes. Review comment: from the javadocs the difference between these 2 methods does not become clear ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; +import java.util.function.Function; + +/** A {@link VertexParallelismInformation} implementation that provides common validation. */ +public class DefaultVertexParallelismInfo implements VertexParallelismInformation { + private final int parallelism; + private int maxParallelism; + private final Function<Integer, Optional<String>> rescaleMaxValidator; + + /** + * Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a + * vertex. + * + * @param parallelism the vertex's parallelism + * @param maxParallelism the vertex's max parallelism + * @param rescaleMaxValidator the validation function to provide an error message if a max + * parallelism rescale is not allowed + */ + public DefaultVertexParallelismInfo( + int parallelism, + int maxParallelism, + Function<Integer, Optional<String>> rescaleMaxValidator) { + this.parallelism = checkParallelism(parallelism); + this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator); + } + + private static int normalizeAndCheckMaxParallelism(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + } + + checkBounds("max parallelism", maxParallelism); + return maxParallelism; + } + + private static int checkParallelism(int parallelism) { + checkBounds("parallelism", parallelism); + return parallelism; Review comment: ```suggestion return checkBounds("parallelism", parallelism); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java ########## @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -import org.junit.Assert; -import org.junit.Test; - -public class ExecutionJobVertexTest { - - private static final int NOT_CONFIGURED = -1; - - @Test - public void testMaxParallelismDefaulting() throws Exception { - - // default minimum - ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED); - Assert.assertEquals(128, executionJobVertex.getMaxParallelism()); - - // test round up part 1 - executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED); - Assert.assertEquals(256, executionJobVertex.getMaxParallelism()); - - // test round up part 2 - executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED); - Assert.assertEquals(512, executionJobVertex.getMaxParallelism()); - - // test round up limit - executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED); - Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); - - // test upper bound - try { - executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED); - executionJobVertex.getMaxParallelism(); - Assert.fail(); - } catch (IllegalArgumentException ignore) { - } - - // parallelism must be smaller than the max parallelism - try { - createExecutionJobVertex(172, 4); - Assert.fail( - "We should not be able to create an ExecutionJobVertex which " - + "has a smaller max parallelism than parallelism."); - } catch (JobException ignored) { - // expected - } - - // test configured / trumps computed default - executionJobVertex = createExecutionJobVertex(4, 1 << 15); - Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); - - // test upper bound configured - try { - executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15)); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test lower bound configured - try { - executionJobVertex = createExecutionJobVertex(4, 0); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test override trumps test configured 2 - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - executionJobVertex.setMaxParallelism(7); - Assert.assertEquals(7, executionJobVertex.getMaxParallelism()); - - // test lower bound with derived value - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - try { - executionJobVertex.setMaxParallelism(0); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test upper bound with derived value - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - try { - executionJobVertex.setMaxParallelism(1 + (1 << 15)); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test complain on setting derived value in presence of a configured value - executionJobVertex = createExecutionJobVertex(4, 16); - try { - executionJobVertex.setMaxParallelism(7); Review comment: are we really still covering all these cases? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; +import java.util.function.Function; + +/** A {@link VertexParallelismInformation} implementation that provides common validation. */ +public class DefaultVertexParallelismInfo implements VertexParallelismInformation { + private final int parallelism; + private int maxParallelism; + private final Function<Integer, Optional<String>> rescaleMaxValidator; + + /** + * Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a + * vertex. + * + * @param parallelism the vertex's parallelism + * @param maxParallelism the vertex's max parallelism + * @param rescaleMaxValidator the validation function to provide an error message if a max + * parallelism rescale is not allowed + */ + public DefaultVertexParallelismInfo( + int parallelism, + int maxParallelism, + Function<Integer, Optional<String>> rescaleMaxValidator) { + this.parallelism = checkParallelism(parallelism); + this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator); + } + + private static int normalizeAndCheckMaxParallelism(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + } + + checkBounds("max parallelism", maxParallelism); + return maxParallelism; Review comment: ```suggestion return checkBounds("max parallelism", maxParallelism); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java ########## @@ -119,6 +123,21 @@ public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor); + @Parameterized.Parameters(name = "execution mode = {0}") + public static Object[] data() { + return new Object[] {SchedulerExecutionMode.REACTIVE, null}; Review comment: I do wonder whether we really need to run all tests with reactive mode, or have one targeted tests to test that the right VertexParallelismStore is computed ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java ########## @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -import org.junit.Assert; -import org.junit.Test; - -public class ExecutionJobVertexTest { - - private static final int NOT_CONFIGURED = -1; - - @Test - public void testMaxParallelismDefaulting() throws Exception { - - // default minimum - ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED); - Assert.assertEquals(128, executionJobVertex.getMaxParallelism()); - - // test round up part 1 - executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED); - Assert.assertEquals(256, executionJobVertex.getMaxParallelism()); - - // test round up part 2 - executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED); - Assert.assertEquals(512, executionJobVertex.getMaxParallelism()); - - // test round up limit - executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED); - Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); - - // test upper bound - try { - executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED); - executionJobVertex.getMaxParallelism(); - Assert.fail(); - } catch (IllegalArgumentException ignore) { - } - - // parallelism must be smaller than the max parallelism - try { - createExecutionJobVertex(172, 4); Review comment: We could keep this test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org