[ 
https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502097#comment-17502097
 ] 

Lijie Wang commented on FLINK-26330:
------------------------------------

I have discussed with [~zhuzh]  offline. We believe that the uneven 
distribution of subpartitions described above can have serious implications (in 
extreme cases, some tasks need to process twice as much data as others), which 
needs to be solved.

The ultimate solution should be automatic load balancing, as described in 
FLIP-187, but there is a lot of work to be done to implement it. We expect to 
implement it in 1.16.

We will provide a temporary solution: In order to make the number of 
subpartitoins evenly consumed by downstream tasks, we can make the number of 
subpartitions a multiple of the number of downstream tasks. In this way, the 
number of subpartitions consumed by each downstream task is the same. For 
simplicity, we require the user-configured 
{{ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM}} to be 2^N, and then adjust the 
parallelism calculated according to 
{{ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK}} to the closest 2^M (M < 
N). The actual average amount of data per processed task will be 
(0.75~1.5)*{{{}ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK{}}}.

And we will describe these behaviors in the documentation. WDYT [~nsemmler] ?

> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
>                 Key: FLINK-26330
>                 URL: https://issues.apache.org/jira/browse/FLINK-26330
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: Niklas Semmler
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>  
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
>         JobManagerOptions.SCHEDULER, 
> JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
>         MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
>         StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
>         .keyBy(num -> num % 10)
>         .sum(0)
>         .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>  
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3. 
> Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed 
> (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>  
> You can change the amout of data produced by source and config options of 
> adaptive batch scheduler according your wishes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to