[ https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502097#comment-17502097 ]
Lijie Wang commented on FLINK-26330: ------------------------------------ I have discussed with [~zhuzh] offline. We believe that the uneven distribution of subpartitions described above can have serious implications (in extreme cases, some tasks need to process twice as much data as others), which needs to be solved. The ultimate solution should be automatic load balancing, as described in FLIP-187, but there is a lot of work to be done to implement it. We expect to implement it in 1.16. We will provide a temporary solution: In order to make the number of subpartitoins evenly consumed by downstream tasks, we can make the number of subpartitions a multiple of the number of downstream tasks. In this way, the number of subpartitions consumed by each downstream task is the same. For simplicity, we require the user-configured {{ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM}} to be 2^N, and then adjust the parallelism calculated according to {{ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK}} to the closest 2^M (M < N). The actual average amount of data per processed task will be (0.75~1.5)*{{{}ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK{}}}. And we will describe these behaviors in the documentation. WDYT [~nsemmler] ? > Test Adaptive Batch Scheduler manually > -------------------------------------- > > Key: FLINK-26330 > URL: https://issues.apache.org/jira/browse/FLINK-26330 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Reporter: Lijie Wang > Assignee: Niklas Semmler > Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > > Documentation: [https://github.com/apache/flink/pull/18757] > Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy: > 1. Whether the automatically decided parallelism is correct > 2. Whether the job result is correct > > *For example:* > {code:java} > final Configuration configuration = new Configuration(); > configuration.set( > JobManagerOptions.SCHEDULER, > JobManagerOptions.SchedulerType.AdaptiveBatch); > configuration.setInteger( > JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4); > configuration.set( > JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK, > MemorySize.parse("8kb")); > configuration.setInteger("parallelism.default", -1); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(configuration); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > env.fromSequence(0, 1000).setParallelism(1) > .keyBy(num -> num % 10) > .sum(0) > .addSink(new PrintSinkFunction<>()); > env.execute(); {code} > You can run above job and check: > > 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3. > Jobmanager logs show following logs: > {code:java} > Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed > (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code} > 2. The job result should be: > {code:java} > 50500 > 49600 > 49700 > 49800 > 49900 > 50000 > 50100 > 50200 > 50300 > 50400 {code} > > You can change the amout of data produced by source and config options of > adaptive batch scheduler according your wishes. -- This message was sent by Atlassian Jira (v8.20.1#820001)