barakbaron12 commented on code in PR #27334: URL: https://github.com/apache/flink/pull/27334#discussion_r3135906288
########## flink-tests/src/test/java/org/apache/flink/test/scheduling/ParallelismOverridesITCase.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.scheduling; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.InjectMiniCluster; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for parallelism overrides via {@code pipeline.jobvertex-parallelism-overrides} + * configuration. + * + * <p>These tests verify that the fix for FLINK-38770 works correctly across different scheduler + * types and submission modes. + */ +@ExtendWith(TestLoggerExtension.class) +public class ParallelismOverridesITCase { + + private static final int NUMBER_OF_SLOTS = 8; + + /** Used to capture the actual parallelism at runtime in Application Mode tests. */ + private static final AtomicInteger CAPTURED_PARALLELISM = new AtomicInteger(0); + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(createConfiguration()) + .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS) + .build()); + + private static Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default); + return configuration; + } + + private RestClusterClient<?> restClusterClient; + private MiniCluster miniCluster; + + @BeforeEach + void beforeEach( + @InjectClusterClient RestClusterClient<?> restClusterClient, + @InjectMiniCluster MiniCluster miniCluster) { + this.restClusterClient = restClusterClient; + this.miniCluster = miniCluster; + CAPTURED_PARALLELISM.set(0); + } + + /** + * Tests parallelism overrides with the Default scheduler. This verifies that JobGraph + * submission (Session Mode scenario) correctly applies overrides. + */ + @Test + void testParallelismOverridesWithDefaultScheduler() throws Exception { + JobVertex vertex = new JobVertex("test-vertex"); + vertex.setParallelism(1); + vertex.setInvokableClass(BlockingNoOpInvokable.class); + + JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); + + // Configure parallelism override to change parallelism from 1 to 4 + Map<String, String> overrides = new HashMap<>(); + overrides.put(vertex.getID().toHexString(), "4"); + jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES, overrides); + + try { + restClusterClient.submitJob(jobGraph).join(); + JobID jobId = jobGraph.getJobID(); + + // Wait for job to be running with 4 tasks + waitForRunningTasks(restClusterClient, jobId, 4); + + // Verify actual parallelism is 4, not 1 + int actualParallelism = getVertexParallelism(restClusterClient, jobId, vertex.getID()); + assertThat(actualParallelism).as("Parallelism override should be applied").isEqualTo(4); + } finally { + restClusterClient.cancel(jobGraph.getJobID()).join(); + } + } + + /** + * Tests that job-level configuration takes precedence over cluster-level configuration for + * parallelism overrides. + */ + @Test + void testJobConfigurationTakesPrecedenceOverClusterConfiguration() throws Exception { Review Comment: Good catch — addressed in commit e1947768f57 by adding `testJobOverrideTakesPrecedenceOverClusterOverride`. The test configures a cluster-level override (parallelism 2) for a reserved vertex ID in `createConfiguration()`, then sets a job-level override (parallelism 4) for the same vertex, and asserts the job-level value wins. Thanks for the suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
