[ https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhanghao Chen updated FLINK-33123: ---------------------------------- Summary: Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler (was: Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler and ) > Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for > autoscaler and adaptive scheduler > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-33123 > URL: https://issues.apache.org/jira/browse/FLINK-33123 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Runtime / Coordination > Affects Versions: 1.17.0, 1.18.0 > Reporter: Zhanghao Chen > Priority: Critical > Attachments: image-2023-09-20-15-09-22-733.png, > image-2023-09-20-15-14-04-679.png > > > *Background* > https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is > wrong when the parallelism is changed for a vertex with a FORWARD edge, which > is used by both the autoscaler and adaptive scheduler where one can change > the vertex parallelism dynamically. Fix is applied to dynamically replace > partitioner from FORWARD to REBLANCE on task deployment in > {{{}StreamTask{}}}: > > !image-2023-09-20-15-09-22-733.png|width=560,height=221! > *Problem* > Unfortunately, the fix is still buggy in two aspects: > # The connections between upstream and downstream tasks are determined by > the distribution type of the partitioner when generating execution graph on > the JM side. When the edge is FORWARD, the distribution type is POINTWISE, > and Flink will try to evenly distribute subpartitions to all downstream > tasks. If one want to change it to REBALANCE, the distribution type has to be > changed to ALL_TO_ALL to make all-to-all connections between upstream and > downstream tasks. However, the fix did not change the distribution type which > makes the network connections be set up in a wrong way. > # The FOWARD partitioner will be replaced if > environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the > task parallelism. However, the number of subpartitions here equals to the > number of downstream tasks of this particular task, which is also determined > by the distribution type of the partitioner when generating execution graph > on the JM side. When ceil(downstream task parallelism / upstream task > parallelism) = upstream task parallelism, we will have the number of > subpartitions = task parallelism. For example, for a topology A (parallelism > 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A > task having 3 subpartition, and hence 1 task will have its number of > subpartitions equals to the task parallelism 2 and skip partitioner > replacement. As a result, that task will only send data to only one > downstream task as the FORWARD partitioner always send data to the first > subpartition. In fact, for a normal job with a FORWARD edge without any > autoscaling action, you will find that the partitioner is changed to > REBALANCE internally as the number of subpartitions always equals to 1 in > this case. > !image-2023-09-20-15-14-04-679.png|width=892,height=301! -- This message was sent by Atlassian Jira (v8.20.10#820010)