[ 
https://issues.apache.org/jira/browse/FLINK-38770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38770:
-----------------------------------
    Labels: pull-request-available  (was: )

> pipeline.jobvertex-parallelism-overrides ignored for StreamGraph in 
> Application Mode
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-38770
>                 URL: https://issues.apache.org/jira/browse/FLINK-38770
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.0.0, 2.1.0, 2.0.1, 2.2.0, 2.1.1
>            Reporter: Barak
>            Assignee: Barak
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.2.0, 2.1.2, 2.0.2
>
>
> h2. Problem
> In Flink 2.0+, when running in Application Mode, the 
> {{pipeline.jobvertex-parallelism-overrides}} configuration is completely 
> ignored. This breaks the Flink Kubernetes Operator's autoscaler 
> functionality, which relies on this configuration to adjust job parallelism 
> during rescaling operations.
> h2. Background
> FLINK-36446 introduced the ability to submit {{StreamGraph}} directly instead 
> of converting to {{JobGraph}} on the client side. This is the default 
> behavior in Application Mode for Flink 2.0+.
> However, the parallelism override logic in {{Dispatcher.internalSubmitJob()}} 
> only handles {{JobGraph}}:
> {code:java}
> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan 
> executionPlan) {
>     if (executionPlan instanceof JobGraph) {
>         applyParallelismOverrides((JobGraph) executionPlan);  // StreamGraph 
> bypasses this!
>     }
>     // ...
> }
> {code}
> When {{StreamGraph}} is submitted, it bypasses this check entirely. The 
> {{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler 
> factories, *after* the override logic is skipped.
> h2. Impact
> * *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in 
> Application Mode
> * Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in 
> Application Mode fails silently
> * Jobs run with original parallelism instead of the overridden values
> h2. Steps to Reproduce
> # Create a simple Flink streaming job
> # Deploy using Flink Kubernetes Operator in Application Mode with Flink 2.0+
> # Trigger autoscaler rescaling (or manually set 
> {{pipeline.jobvertex-parallelism-overrides}})
> # Observe that job parallelism does not change
> h2. Root Cause Analysis
> h3. Session Mode (JobGraph submitted) - WORKING
> {noformat}
> Client: StreamGraph → JobGraph conversion
>    ↓
> Dispatcher.internalSubmitJob(JobGraph)
>    ↓
> ✅ applyParallelismOverrides(JobGraph) - APPLIED
>    ↓
> SchedulerFactory receives modified JobGraph
> {noformat}
> h3. Application Mode (StreamGraph submitted) - BROKEN
> {noformat}
> Application Driver: Creates StreamGraph directly
>    ↓
> Dispatcher.internalSubmitJob(StreamGraph)
>    ↓
> ❌ if (instanceof JobGraph) → FALSE, SKIPPED!
>    ↓
> SchedulerFactory.createInstance(StreamGraph)
>    ↓
> StreamGraph → JobGraph conversion happens HERE
>    ↓
> ❌ Overrides never applied
> {noformat}
> h3. Affected Code Paths
> The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple scheduler 
> factories:
> ||Scheduler Factory||File||Conversion Location||
> |DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion|
> |AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct conversion|
> |AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When 
> batch recovery enabled|
> |AdaptiveBatchScheduler 
> (adaptive)|DefaultAdaptiveExecutionHandler|Incremental via 
> AdaptiveGraphManager|
> h2. Proposed Solution
> h3. Approach
> # *Create a utility class* {{ParallelismOverrideUtil}} with the override logic
> # *Remove* the override logic from {{Dispatcher.internalSubmitJob()}}
> # *Call the utility* in all scheduler factories after {{StreamGraph}} → 
> {{JobGraph}} conversion
> # *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}}
> h3. New Utility Class
> Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}:
> {code:java}
> public class ParallelismOverrideUtil {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(ParallelismOverrideUtil.class);
>     /**
>      * Applies parallelism overrides from configuration to the JobGraph.
>      * This method MUST be called after converting StreamGraph to JobGraph
>      * in all SchedulerNGFactory implementations.
>      */
>     public static void applyParallelismOverrides(JobGraph jobGraph, 
> Configuration configuration) {
>         Map<String, String> overrides = new HashMap<>();
>         
> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
>         
> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
>         for (JobVertex vertex : jobGraph.getVertices()) {
>             String override = overrides.get(vertex.getID().toHexString());
>             if (override != null) {
>                 int currentParallelism = vertex.getParallelism();
>                 int overrideParallelism = Integer.parseInt(override);
>                 LOG.info("Applying parallelism override for vertex {}: {} -> 
> {}",
>                         vertex.getID(), currentParallelism, 
> overrideParallelism);
>                 vertex.setParallelism(overrideParallelism);
>             }
>         }
>     }
> }
> {code}
> h3. Files to Modify
> # {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and its 
> call
> # {{DefaultSchedulerFactory.java}} - Add utility call after conversion (line 
> ~92)
> # {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion (line 
> ~96)
> # {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after 
> conversion (line ~65)
> # {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle 
> incremental conversion
> h2. Testing Plan
> h3. Existing Test Coverage
> There is one existing test in {{DispatcherTest.java}}:
> * {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}} 
> submission
> *Missing coverage:* No tests exist for {{StreamGraph}} submission with 
> parallelism overrides, which is why this regression was not caught.
> h3. Required: Integration Test (using MiniClusterExtension)
> Create new test class {{ParallelismOverridesITCase.java}} in {{flink-tests}} 
> module.
> This is the *recommended approach* because it:
> * Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job
> * Can verify actual parallelism via REST API
> * Uses existing test infrastructure ({{MiniClusterExtension}})
> * Is self-contained and doesn't require complex mocking
> {code:java}
> @ExtendWith(TestLoggerExtension.class)
> public class ParallelismOverridesITCase {
>     @RegisterExtension
>     private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
>             new MiniClusterExtension(
>                     new MiniClusterResourceConfiguration.Builder()
>                             .setConfiguration(createConfiguration())
>                             .setNumberSlotsPerTaskManager(4)
>                             .build());
>     private static Configuration createConfiguration() {
>         Configuration config = new Configuration();
>         // Parallelism overrides will be set per-test
>         return config;
>     }
>     @Test
>     void testParallelismOverridesWithStreamGraph(
>             @InjectClusterClient RestClusterClient<?> client,
>             @InjectMiniCluster MiniCluster miniCluster) throws Exception {
>         // 1. Create StreamGraph with initial parallelism = 1
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         DataStream<Integer> stream = env.fromElements(1, 2, 3)
>                 .map(x -> x);
>         stream.sinkTo(new DiscardingSink<>());
>         StreamGraph streamGraph = env.getStreamGraph();
>         // 2. Get vertex ID and configure override to parallelism = 4
>         String vertexId = getMapVertexId(streamGraph);
>         Configuration config = new Configuration();
>         config.set(PipelineOptions.PARALLELISM_OVERRIDES,
>                    Map.of(vertexId, "4"));
>         // 3. Submit StreamGraph with overrides
>         JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID();
>         // 4. Verify actual parallelism is 4 (not 1)
>         JobDetailsInfo jobDetails = client.getJobDetails(jobId).join();
>         int actualParallelism = jobDetails.getJobVertexInfos().stream()
>                 .filter(v -> v.getName().contains("Map"))
>                 .findFirst()
>                 .map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism)
>                 .orElseThrow();
>         assertEquals(4, actualParallelism,
>                 "Parallelism override should be applied to StreamGraph 
> submission");
>     }
>     @Test
>     void testParallelismOverridesWithJobGraph(
>             @InjectClusterClient RestClusterClient<?> client) throws 
> Exception {
>         // Ensure JobGraph submission still works (regression test for 
> existing behavior)
>         // Similar structure but submit JobGraph instead of StreamGraph
>     }
> }
> {code}
> h3. Test Matrix
> ||Test Case||ExecutionPlan||Expected Behavior||
> |testParallelismOverridesWithStreamGraph|StreamGraph|Override applied (THIS 
> IS THE BUG FIX)|
> |testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing 
> behavior)|
> |testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides 
> cluster config|
> |testPartialOverrides|StreamGraph|Only specified vertices modified|
> h3. Optional: Unit Test for Utility Class
> Add unit tests for {{ParallelismOverrideUtil}} if created:
> {code:java}
> class ParallelismOverrideUtilTest {
>     @Test
>     void testApplyOverridesFromConfiguration() { }
>     @Test
>     void testApplyOverridesFromJobGraphConfig() { }
>     @Test
>     void testJobGraphConfigTakesPrecedence() { }
>     @Test
>     void testUnknownVertexIdIgnored() { }
>     @Test
>     void testNoOverridesWhenEmpty() { }
> }
> {code}
> h2. Acceptance Criteria
> * {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode with 
> StreamGraph
> * All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides 
> correctly
> * Session Mode (JobGraph submission) continues to work
> * Overrides from both jobMasterConfiguration and jobConfiguration are 
> respected
> * Unit tests cover the utility method
> * Integration tests cover all scheduler/execution-plan combinations
> h2. Related
> * FLINK-36446 - Submit StreamGraph directly in Application Mode (introduced 
> the regression)
> * [Mailing list 
> discussion|https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to