Subject: [DISCUSS] Review request: FLINK-38770 - Fix parallelism overrides
ignored in Application Mode

Hi all,

I'd like to ask for a review of my PR for FLINK-38770:
https://github.com/apache/flink/pull/27334

The PR has been open for a few months now and CI is green. I'd really
appreciate it if someone could take a look.

Thanks.

On Wed, Dec 3, 2025 at 4:19 PM Barak (Jira) <[email protected]> wrote:

> Barak created FLINK-38770:
> -----------------------------
>
>              Summary: pipeline.jobvertex-parallelism-overrides ignored for
> StreamGraph in Application Mode
>                  Key: FLINK-38770
>                  URL: https://issues.apache.org/jira/browse/FLINK-38770
>              Project: Flink
>           Issue Type: Bug
>           Components: Runtime / Coordination
>     Affects Versions: 2.1.1, 2.2.0, 2.0.1, 2.1.0, 2.0.0
>             Reporter: Barak
>              Fix For: 2.1.2, 2.0.2, 2.2.0
>
>
> h2. Problem
>
> In Flink 2.0+, when running in Application Mode, the
> {{pipeline.jobvertex-parallelism-overrides}} configuration is completely
> ignored. This breaks the Flink Kubernetes Operator's autoscaler
> functionality, which relies on this configuration to adjust job parallelism
> during rescaling operations.
>
> h2. Background
>
> FLINK-36446 introduced the ability to submit {{StreamGraph}} directly
> instead of converting to {{JobGraph}} on the client side. This is the
> default behavior in Application Mode for Flink 2.0+.
>
> However, the parallelism override logic in
> {{Dispatcher.internalSubmitJob()}} only handles {{JobGraph}}:
>
> {code:java}
> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
> executionPlan) {
>     if (executionPlan instanceof JobGraph) {
>         applyParallelismOverrides((JobGraph) executionPlan);  //
> StreamGraph bypasses this!
>     }
>     // ...
> }
> {code}
>
> When {{StreamGraph}} is submitted, it bypasses this check entirely. The
> {{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler
> factories, *after* the override logic is skipped.
>
> h2. Impact
>
> * *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in
> Application Mode
> * Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in
> Application Mode fails silently
> * Jobs run with original parallelism instead of the overridden values
>
> h2. Steps to Reproduce
>
> # Create a simple Flink streaming job
> # Deploy using Flink Kubernetes Operator in Application Mode with Flink
> 2.0+
> # Trigger autoscaler rescaling (or manually set
> {{pipeline.jobvertex-parallelism-overrides}})
> # Observe that job parallelism does not change
>
> h2. Root Cause Analysis
>
> h3. Session Mode (JobGraph submitted) - WORKING
>
> {noformat}
> Client: StreamGraph → JobGraph conversion
>    ↓
> Dispatcher.internalSubmitJob(JobGraph)
>    ↓
> ✅ applyParallelismOverrides(JobGraph) - APPLIED
>    ↓
> SchedulerFactory receives modified JobGraph
> {noformat}
>
> h3. Application Mode (StreamGraph submitted) - BROKEN
>
> {noformat}
> Application Driver: Creates StreamGraph directly
>    ↓
> Dispatcher.internalSubmitJob(StreamGraph)
>    ↓
> ❌ if (instanceof JobGraph) → FALSE, SKIPPED!
>    ↓
> SchedulerFactory.createInstance(StreamGraph)
>    ↓
> StreamGraph → JobGraph conversion happens HERE
>    ↓
> ❌ Overrides never applied
> {noformat}
>
> h3. Affected Code Paths
>
> The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple
> scheduler factories:
>
> ||Scheduler Factory||File||Conversion Location||
> |DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion|
> |AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct
> conversion|
> |AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When
> batch recovery enabled|
> |AdaptiveBatchScheduler
> (adaptive)|DefaultAdaptiveExecutionHandler|Incremental via
> AdaptiveGraphManager|
>
> h2. Proposed Solution
>
> h3. Approach
>
> # *Create a utility class* {{ParallelismOverrideUtil}} with the override
> logic
> # *Remove* the override logic from {{Dispatcher.internalSubmitJob()}}
> # *Call the utility* in all scheduler factories after {{StreamGraph}} →
> {{JobGraph}} conversion
> # *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}}
>
> h3. New Utility Class
>
> Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}:
>
> {code:java}
> public class ParallelismOverrideUtil {
>     private static final Logger LOG =
> LoggerFactory.getLogger(ParallelismOverrideUtil.class);
>
>     /**
>      * Applies parallelism overrides from configuration to the JobGraph.
>      * This method MUST be called after converting StreamGraph to JobGraph
>      * in all SchedulerNGFactory implementations.
>      */
>     public static void applyParallelismOverrides(JobGraph jobGraph,
> Configuration configuration) {
>         Map<String, String> overrides = new HashMap<>();
>
> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
>
> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
>
>         for (JobVertex vertex : jobGraph.getVertices()) {
>             String override = overrides.get(vertex.getID().toHexString());
>             if (override != null) {
>                 int currentParallelism = vertex.getParallelism();
>                 int overrideParallelism = Integer.parseInt(override);
>                 LOG.info("Applying parallelism override for vertex {}: {}
> -> {}",
>                         vertex.getID(), currentParallelism,
> overrideParallelism);
>                 vertex.setParallelism(overrideParallelism);
>             }
>         }
>     }
> }
> {code}
>
> h3. Files to Modify
>
> # {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and
> its call
> # {{DefaultSchedulerFactory.java}} - Add utility call after conversion
> (line ~92)
> # {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion
> (line ~96)
> # {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after
> conversion (line ~65)
> # {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle
> incremental conversion
>
> h2. Testing Plan
>
> h3. Existing Test Coverage
>
> There is one existing test in {{DispatcherTest.java}}:
> * {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}}
> submission
>
> *Missing coverage:* No tests exist for {{StreamGraph}} submission with
> parallelism overrides, which is why this regression was not caught.
>
> h3. Required: Integration Test (using MiniClusterExtension)
>
> Create new test class {{ParallelismOverridesITCase.java}} in
> {{flink-tests}} module.
>
> This is the *recommended approach* because it:
> * Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job
> * Can verify actual parallelism via REST API
> * Uses existing test infrastructure ({{MiniClusterExtension}})
> * Is self-contained and doesn't require complex mocking
>
> {code:java}
> @ExtendWith(TestLoggerExtension.class)
> public class ParallelismOverridesITCase {
>
>     @RegisterExtension
>     private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
>             new MiniClusterExtension(
>                     new MiniClusterResourceConfiguration.Builder()
>                             .setConfiguration(createConfiguration())
>                             .setNumberSlotsPerTaskManager(4)
>                             .build());
>
>     private static Configuration createConfiguration() {
>         Configuration config = new Configuration();
>         // Parallelism overrides will be set per-test
>         return config;
>     }
>
>     @Test
>     void testParallelismOverridesWithStreamGraph(
>             @InjectClusterClient RestClusterClient<?> client,
>             @InjectMiniCluster MiniCluster miniCluster) throws Exception {
>
>         // 1. Create StreamGraph with initial parallelism = 1
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         DataStream<Integer> stream = env.fromElements(1, 2, 3)
>                 .map(x -> x);
>         stream.sinkTo(new DiscardingSink<>());
>         StreamGraph streamGraph = env.getStreamGraph();
>
>         // 2. Get vertex ID and configure override to parallelism = 4
>         String vertexId = getMapVertexId(streamGraph);
>         Configuration config = new Configuration();
>         config.set(PipelineOptions.PARALLELISM_OVERRIDES,
>                    Map.of(vertexId, "4"));
>
>         // 3. Submit StreamGraph with overrides
>         JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID();
>
>         // 4. Verify actual parallelism is 4 (not 1)
>         JobDetailsInfo jobDetails = client.getJobDetails(jobId).join();
>         int actualParallelism = jobDetails.getJobVertexInfos().stream()
>                 .filter(v -> v.getName().contains("Map"))
>                 .findFirst()
>                 .map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism)
>                 .orElseThrow();
>
>         assertEquals(4, actualParallelism,
>                 "Parallelism override should be applied to StreamGraph
> submission");
>     }
>
>     @Test
>     void testParallelismOverridesWithJobGraph(
>             @InjectClusterClient RestClusterClient<?> client) throws
> Exception {
>         // Ensure JobGraph submission still works (regression test for
> existing behavior)
>         // Similar structure but submit JobGraph instead of StreamGraph
>     }
> }
> {code}
>
> h3. Test Matrix
>
> ||Test Case||ExecutionPlan||Expected Behavior||
> |testParallelismOverridesWithStreamGraph|StreamGraph|Override applied
> (THIS IS THE BUG FIX)|
> |testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing
> behavior)|
> |testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides
> cluster config|
> |testPartialOverrides|StreamGraph|Only specified vertices modified|
>
> h3. Optional: Unit Test for Utility Class
>
> Add unit tests for {{ParallelismOverrideUtil}} if created:
>
> {code:java}
> class ParallelismOverrideUtilTest {
>     @Test
>     void testApplyOverridesFromConfiguration() { }
>
>     @Test
>     void testApplyOverridesFromJobGraphConfig() { }
>
>     @Test
>     void testJobGraphConfigTakesPrecedence() { }
>
>     @Test
>     void testUnknownVertexIdIgnored() { }
>
>     @Test
>     void testNoOverridesWhenEmpty() { }
> }
> {code}
>
> h2. Acceptance Criteria
>
> * {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode
> with StreamGraph
> * All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides
> correctly
> * Session Mode (JobGraph submission) continues to work
> * Overrides from both jobMasterConfiguration and jobConfiguration are
> respected
> * Unit tests cover the utility method
> * Integration tests cover all scheduler/execution-plan combinations
>
> h2. Related
>
> * FLINK-36446 - Submit StreamGraph directly in Application Mode
> (introduced the regression)
> * [Mailing list discussion|
> https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz]
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)
>


-- 
Barak Bar-On, Staff Engineer
email: [email protected]  web: www.forter.com
mobile: 054-571-3374

Reply via email to