Subject: [DISCUSS] Review request: FLINK-38770 - Fix parallelism overrides ignored in Application Mode
Hi all, I'd like to ask for a review of my PR for FLINK-38770: https://github.com/apache/flink/pull/27334 The PR has been open for a few months now and CI is green. I'd really appreciate it if someone could take a look. Thanks. On Wed, Dec 3, 2025 at 4:19 PM Barak (Jira) <[email protected]> wrote: > Barak created FLINK-38770: > ----------------------------- > > Summary: pipeline.jobvertex-parallelism-overrides ignored for > StreamGraph in Application Mode > Key: FLINK-38770 > URL: https://issues.apache.org/jira/browse/FLINK-38770 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 2.1.1, 2.2.0, 2.0.1, 2.1.0, 2.0.0 > Reporter: Barak > Fix For: 2.1.2, 2.0.2, 2.2.0 > > > h2. Problem > > In Flink 2.0+, when running in Application Mode, the > {{pipeline.jobvertex-parallelism-overrides}} configuration is completely > ignored. This breaks the Flink Kubernetes Operator's autoscaler > functionality, which relies on this configuration to adjust job parallelism > during rescaling operations. > > h2. Background > > FLINK-36446 introduced the ability to submit {{StreamGraph}} directly > instead of converting to {{JobGraph}} on the client side. This is the > default behavior in Application Mode for Flink 2.0+. > > However, the parallelism override logic in > {{Dispatcher.internalSubmitJob()}} only handles {{JobGraph}}: > > {code:java} > private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan > executionPlan) { > if (executionPlan instanceof JobGraph) { > applyParallelismOverrides((JobGraph) executionPlan); // > StreamGraph bypasses this! > } > // ... > } > {code} > > When {{StreamGraph}} is submitted, it bypasses this check entirely. The > {{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler > factories, *after* the override logic is skipped. > > h2. Impact > > * *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in > Application Mode > * Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in > Application Mode fails silently > * Jobs run with original parallelism instead of the overridden values > > h2. Steps to Reproduce > > # Create a simple Flink streaming job > # Deploy using Flink Kubernetes Operator in Application Mode with Flink > 2.0+ > # Trigger autoscaler rescaling (or manually set > {{pipeline.jobvertex-parallelism-overrides}}) > # Observe that job parallelism does not change > > h2. Root Cause Analysis > > h3. Session Mode (JobGraph submitted) - WORKING > > {noformat} > Client: StreamGraph → JobGraph conversion > ↓ > Dispatcher.internalSubmitJob(JobGraph) > ↓ > ✅ applyParallelismOverrides(JobGraph) - APPLIED > ↓ > SchedulerFactory receives modified JobGraph > {noformat} > > h3. Application Mode (StreamGraph submitted) - BROKEN > > {noformat} > Application Driver: Creates StreamGraph directly > ↓ > Dispatcher.internalSubmitJob(StreamGraph) > ↓ > ❌ if (instanceof JobGraph) → FALSE, SKIPPED! > ↓ > SchedulerFactory.createInstance(StreamGraph) > ↓ > StreamGraph → JobGraph conversion happens HERE > ↓ > ❌ Overrides never applied > {noformat} > > h3. Affected Code Paths > > The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple > scheduler factories: > > ||Scheduler Factory||File||Conversion Location|| > |DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion| > |AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct > conversion| > |AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When > batch recovery enabled| > |AdaptiveBatchScheduler > (adaptive)|DefaultAdaptiveExecutionHandler|Incremental via > AdaptiveGraphManager| > > h2. Proposed Solution > > h3. Approach > > # *Create a utility class* {{ParallelismOverrideUtil}} with the override > logic > # *Remove* the override logic from {{Dispatcher.internalSubmitJob()}} > # *Call the utility* in all scheduler factories after {{StreamGraph}} → > {{JobGraph}} conversion > # *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}} > > h3. New Utility Class > > Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}: > > {code:java} > public class ParallelismOverrideUtil { > private static final Logger LOG = > LoggerFactory.getLogger(ParallelismOverrideUtil.class); > > /** > * Applies parallelism overrides from configuration to the JobGraph. > * This method MUST be called after converting StreamGraph to JobGraph > * in all SchedulerNGFactory implementations. > */ > public static void applyParallelismOverrides(JobGraph jobGraph, > Configuration configuration) { > Map<String, String> overrides = new HashMap<>(); > > overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES)); > > overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES)); > > for (JobVertex vertex : jobGraph.getVertices()) { > String override = overrides.get(vertex.getID().toHexString()); > if (override != null) { > int currentParallelism = vertex.getParallelism(); > int overrideParallelism = Integer.parseInt(override); > LOG.info("Applying parallelism override for vertex {}: {} > -> {}", > vertex.getID(), currentParallelism, > overrideParallelism); > vertex.setParallelism(overrideParallelism); > } > } > } > } > {code} > > h3. Files to Modify > > # {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and > its call > # {{DefaultSchedulerFactory.java}} - Add utility call after conversion > (line ~92) > # {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion > (line ~96) > # {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after > conversion (line ~65) > # {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle > incremental conversion > > h2. Testing Plan > > h3. Existing Test Coverage > > There is one existing test in {{DispatcherTest.java}}: > * {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}} > submission > > *Missing coverage:* No tests exist for {{StreamGraph}} submission with > parallelism overrides, which is why this regression was not caught. > > h3. Required: Integration Test (using MiniClusterExtension) > > Create new test class {{ParallelismOverridesITCase.java}} in > {{flink-tests}} module. > > This is the *recommended approach* because it: > * Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job > * Can verify actual parallelism via REST API > * Uses existing test infrastructure ({{MiniClusterExtension}}) > * Is self-contained and doesn't require complex mocking > > {code:java} > @ExtendWith(TestLoggerExtension.class) > public class ParallelismOverridesITCase { > > @RegisterExtension > private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = > new MiniClusterExtension( > new MiniClusterResourceConfiguration.Builder() > .setConfiguration(createConfiguration()) > .setNumberSlotsPerTaskManager(4) > .build()); > > private static Configuration createConfiguration() { > Configuration config = new Configuration(); > // Parallelism overrides will be set per-test > return config; > } > > @Test > void testParallelismOverridesWithStreamGraph( > @InjectClusterClient RestClusterClient<?> client, > @InjectMiniCluster MiniCluster miniCluster) throws Exception { > > // 1. Create StreamGraph with initial parallelism = 1 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataStream<Integer> stream = env.fromElements(1, 2, 3) > .map(x -> x); > stream.sinkTo(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > > // 2. Get vertex ID and configure override to parallelism = 4 > String vertexId = getMapVertexId(streamGraph); > Configuration config = new Configuration(); > config.set(PipelineOptions.PARALLELISM_OVERRIDES, > Map.of(vertexId, "4")); > > // 3. Submit StreamGraph with overrides > JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID(); > > // 4. Verify actual parallelism is 4 (not 1) > JobDetailsInfo jobDetails = client.getJobDetails(jobId).join(); > int actualParallelism = jobDetails.getJobVertexInfos().stream() > .filter(v -> v.getName().contains("Map")) > .findFirst() > .map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism) > .orElseThrow(); > > assertEquals(4, actualParallelism, > "Parallelism override should be applied to StreamGraph > submission"); > } > > @Test > void testParallelismOverridesWithJobGraph( > @InjectClusterClient RestClusterClient<?> client) throws > Exception { > // Ensure JobGraph submission still works (regression test for > existing behavior) > // Similar structure but submit JobGraph instead of StreamGraph > } > } > {code} > > h3. Test Matrix > > ||Test Case||ExecutionPlan||Expected Behavior|| > |testParallelismOverridesWithStreamGraph|StreamGraph|Override applied > (THIS IS THE BUG FIX)| > |testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing > behavior)| > |testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides > cluster config| > |testPartialOverrides|StreamGraph|Only specified vertices modified| > > h3. Optional: Unit Test for Utility Class > > Add unit tests for {{ParallelismOverrideUtil}} if created: > > {code:java} > class ParallelismOverrideUtilTest { > @Test > void testApplyOverridesFromConfiguration() { } > > @Test > void testApplyOverridesFromJobGraphConfig() { } > > @Test > void testJobGraphConfigTakesPrecedence() { } > > @Test > void testUnknownVertexIdIgnored() { } > > @Test > void testNoOverridesWhenEmpty() { } > } > {code} > > h2. Acceptance Criteria > > * {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode > with StreamGraph > * All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides > correctly > * Session Mode (JobGraph submission) continues to work > * Overrides from both jobMasterConfiguration and jobConfiguration are > respected > * Unit tests cover the utility method > * Integration tests cover all scheduler/execution-plan combinations > > h2. Related > > * FLINK-36446 - Submit StreamGraph directly in Application Mode > (introduced the regression) > * [Mailing list discussion| > https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz] > > > > -- > This message was sent by Atlassian Jira > (v8.20.10#820010) > -- Barak Bar-On, Staff Engineer email: [email protected] web: www.forter.com mobile: 054-571-3374
