[
https://issues.apache.org/jira/browse/FLINK-38770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38770:
-----------------------------------
Labels: pull-request-available (was: )
> pipeline.jobvertex-parallelism-overrides ignored for StreamGraph in
> Application Mode
> ------------------------------------------------------------------------------------
>
> Key: FLINK-38770
> URL: https://issues.apache.org/jira/browse/FLINK-38770
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.0.0, 2.1.0, 2.0.1, 2.2.0, 2.1.1
> Reporter: Barak
> Assignee: Barak
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.2.0, 2.1.2, 2.0.2
>
>
> h2. Problem
> In Flink 2.0+, when running in Application Mode, the
> {{pipeline.jobvertex-parallelism-overrides}} configuration is completely
> ignored. This breaks the Flink Kubernetes Operator's autoscaler
> functionality, which relies on this configuration to adjust job parallelism
> during rescaling operations.
> h2. Background
> FLINK-36446 introduced the ability to submit {{StreamGraph}} directly instead
> of converting to {{JobGraph}} on the client side. This is the default
> behavior in Application Mode for Flink 2.0+.
> However, the parallelism override logic in {{Dispatcher.internalSubmitJob()}}
> only handles {{JobGraph}}:
> {code:java}
> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
> executionPlan) {
> if (executionPlan instanceof JobGraph) {
> applyParallelismOverrides((JobGraph) executionPlan); // StreamGraph
> bypasses this!
> }
> // ...
> }
> {code}
> When {{StreamGraph}} is submitted, it bypasses this check entirely. The
> {{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler
> factories, *after* the override logic is skipped.
> h2. Impact
> * *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in
> Application Mode
> * Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in
> Application Mode fails silently
> * Jobs run with original parallelism instead of the overridden values
> h2. Steps to Reproduce
> # Create a simple Flink streaming job
> # Deploy using Flink Kubernetes Operator in Application Mode with Flink 2.0+
> # Trigger autoscaler rescaling (or manually set
> {{pipeline.jobvertex-parallelism-overrides}})
> # Observe that job parallelism does not change
> h2. Root Cause Analysis
> h3. Session Mode (JobGraph submitted) - WORKING
> {noformat}
> Client: StreamGraph → JobGraph conversion
> ↓
> Dispatcher.internalSubmitJob(JobGraph)
> ↓
> ✅ applyParallelismOverrides(JobGraph) - APPLIED
> ↓
> SchedulerFactory receives modified JobGraph
> {noformat}
> h3. Application Mode (StreamGraph submitted) - BROKEN
> {noformat}
> Application Driver: Creates StreamGraph directly
> ↓
> Dispatcher.internalSubmitJob(StreamGraph)
> ↓
> ❌ if (instanceof JobGraph) → FALSE, SKIPPED!
> ↓
> SchedulerFactory.createInstance(StreamGraph)
> ↓
> StreamGraph → JobGraph conversion happens HERE
> ↓
> ❌ Overrides never applied
> {noformat}
> h3. Affected Code Paths
> The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple scheduler
> factories:
> ||Scheduler Factory||File||Conversion Location||
> |DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion|
> |AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct conversion|
> |AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When
> batch recovery enabled|
> |AdaptiveBatchScheduler
> (adaptive)|DefaultAdaptiveExecutionHandler|Incremental via
> AdaptiveGraphManager|
> h2. Proposed Solution
> h3. Approach
> # *Create a utility class* {{ParallelismOverrideUtil}} with the override logic
> # *Remove* the override logic from {{Dispatcher.internalSubmitJob()}}
> # *Call the utility* in all scheduler factories after {{StreamGraph}} →
> {{JobGraph}} conversion
> # *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}}
> h3. New Utility Class
> Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}:
> {code:java}
> public class ParallelismOverrideUtil {
> private static final Logger LOG =
> LoggerFactory.getLogger(ParallelismOverrideUtil.class);
> /**
> * Applies parallelism overrides from configuration to the JobGraph.
> * This method MUST be called after converting StreamGraph to JobGraph
> * in all SchedulerNGFactory implementations.
> */
> public static void applyParallelismOverrides(JobGraph jobGraph,
> Configuration configuration) {
> Map<String, String> overrides = new HashMap<>();
>
> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
>
> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
> for (JobVertex vertex : jobGraph.getVertices()) {
> String override = overrides.get(vertex.getID().toHexString());
> if (override != null) {
> int currentParallelism = vertex.getParallelism();
> int overrideParallelism = Integer.parseInt(override);
> LOG.info("Applying parallelism override for vertex {}: {} ->
> {}",
> vertex.getID(), currentParallelism,
> overrideParallelism);
> vertex.setParallelism(overrideParallelism);
> }
> }
> }
> }
> {code}
> h3. Files to Modify
> # {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and its
> call
> # {{DefaultSchedulerFactory.java}} - Add utility call after conversion (line
> ~92)
> # {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion (line
> ~96)
> # {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after
> conversion (line ~65)
> # {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle
> incremental conversion
> h2. Testing Plan
> h3. Existing Test Coverage
> There is one existing test in {{DispatcherTest.java}}:
> * {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}}
> submission
> *Missing coverage:* No tests exist for {{StreamGraph}} submission with
> parallelism overrides, which is why this regression was not caught.
> h3. Required: Integration Test (using MiniClusterExtension)
> Create new test class {{ParallelismOverridesITCase.java}} in {{flink-tests}}
> module.
> This is the *recommended approach* because it:
> * Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job
> * Can verify actual parallelism via REST API
> * Uses existing test infrastructure ({{MiniClusterExtension}})
> * Is self-contained and doesn't require complex mocking
> {code:java}
> @ExtendWith(TestLoggerExtension.class)
> public class ParallelismOverridesITCase {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setConfiguration(createConfiguration())
> .setNumberSlotsPerTaskManager(4)
> .build());
> private static Configuration createConfiguration() {
> Configuration config = new Configuration();
> // Parallelism overrides will be set per-test
> return config;
> }
> @Test
> void testParallelismOverridesWithStreamGraph(
> @InjectClusterClient RestClusterClient<?> client,
> @InjectMiniCluster MiniCluster miniCluster) throws Exception {
> // 1. Create StreamGraph with initial parallelism = 1
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream<Integer> stream = env.fromElements(1, 2, 3)
> .map(x -> x);
> stream.sinkTo(new DiscardingSink<>());
> StreamGraph streamGraph = env.getStreamGraph();
> // 2. Get vertex ID and configure override to parallelism = 4
> String vertexId = getMapVertexId(streamGraph);
> Configuration config = new Configuration();
> config.set(PipelineOptions.PARALLELISM_OVERRIDES,
> Map.of(vertexId, "4"));
> // 3. Submit StreamGraph with overrides
> JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID();
> // 4. Verify actual parallelism is 4 (not 1)
> JobDetailsInfo jobDetails = client.getJobDetails(jobId).join();
> int actualParallelism = jobDetails.getJobVertexInfos().stream()
> .filter(v -> v.getName().contains("Map"))
> .findFirst()
> .map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism)
> .orElseThrow();
> assertEquals(4, actualParallelism,
> "Parallelism override should be applied to StreamGraph
> submission");
> }
> @Test
> void testParallelismOverridesWithJobGraph(
> @InjectClusterClient RestClusterClient<?> client) throws
> Exception {
> // Ensure JobGraph submission still works (regression test for
> existing behavior)
> // Similar structure but submit JobGraph instead of StreamGraph
> }
> }
> {code}
> h3. Test Matrix
> ||Test Case||ExecutionPlan||Expected Behavior||
> |testParallelismOverridesWithStreamGraph|StreamGraph|Override applied (THIS
> IS THE BUG FIX)|
> |testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing
> behavior)|
> |testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides
> cluster config|
> |testPartialOverrides|StreamGraph|Only specified vertices modified|
> h3. Optional: Unit Test for Utility Class
> Add unit tests for {{ParallelismOverrideUtil}} if created:
> {code:java}
> class ParallelismOverrideUtilTest {
> @Test
> void testApplyOverridesFromConfiguration() { }
> @Test
> void testApplyOverridesFromJobGraphConfig() { }
> @Test
> void testJobGraphConfigTakesPrecedence() { }
> @Test
> void testUnknownVertexIdIgnored() { }
> @Test
> void testNoOverridesWhenEmpty() { }
> }
> {code}
> h2. Acceptance Criteria
> * {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode with
> StreamGraph
> * All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides
> correctly
> * Session Mode (JobGraph submission) continues to work
> * Overrides from both jobMasterConfiguration and jobConfiguration are
> respected
> * Unit tests cover the utility method
> * Integration tests cover all scheduler/execution-plan combinations
> h2. Related
> * FLINK-36446 - Submit StreamGraph directly in Application Mode (introduced
> the regression)
> * [Mailing list
> discussion|https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)