[ https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhanghao Chen updated FLINK-33123: ---------------------------------- Attachment: image-2023-09-20-15-09-22-733.png > Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for > autoscaler and adaptive scheduler and > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-33123 > URL: https://issues.apache.org/jira/browse/FLINK-33123 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Runtime / Coordination > Affects Versions: 1.17.0, 1.18.0 > Reporter: Zhanghao Chen > Priority: Critical > Attachments: image-2023-09-20-15-09-22-733.png > > > *Background* > https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is > wrong when the parallelism is changed for a vertex with a FORWARD edge, which > is used by both the autoscaler and adaptive scheduler where one can change > the vertex parallelism dynamically. Fix is applied to dynamically replace > partitioner from FORWARD to REBLANCE on task deployment in > {{{}StreamTask{}}}: > {{private static void > replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}} > {{ Environment environment, NonChainedOutput streamOutput) {}} > {{ Environment environment, NonChainedOutput streamOutput, int > outputIndex) {}} > {{ if (streamOutput.getPartitioner() instanceof ForwardPartitioner}} > {{ && streamOutput.getConsumerParallelism()}} > {{ && > environment.getWriter(outputIndex).getNumberOfSubpartitions()}} > {{ != > environment.getTaskInfo().getNumberOfParallelSubtasks()) {}} > {{ LOG.debug(}} > {{ "Replacing forward partitioner with rebalance for {}",}} > {{ environment.getTaskInfo().getTaskNameWithSubtasks());}} > {{ streamOutput.setPartitioner(new RebalancePartitioner<>());}} > {{ }}} > {{ }}} > *Problem* > Unfortunately, the fix is still buggy in two aspects: > # The connections between upstream and downstream tasks are determined by > the distribution type of the partitioner when generating execution graph on > the JM side. When the edge is FORWARD, the distribution type is POINTWISE, > and Flink will try to evenly distribute subpartitions to all downstream > tasks. If one want to change it to REBALANCE, the distribution type has to be > changed to ALL_TO_ALL to make all-to-all connections between upstream and > downstream tasks. However, the fix did not change the distribution type which > makes the network connections be set up in a wrong way. > # The FOWARD partitioner will be replaced if > environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the > task parallelism. However, the number of subpartitions here equals to the > number of downstream tasks of this particular task, which is also determined > by the distribution type of the partitioner when generating execution graph > on the JM side. When ceil(downstream task parallelism / upstream task > parallelism) = upstream task parallelism, we will have the number of > subpartitions = task parallelism. In fact, for a normal job with a FORWARD > edge without any autoscaling action, you will find that the partitioner is > changed to REBALANCE internally as the number of subpartitions always equals > to 1 in this case. > -- This message was sent by Atlassian Jira (v8.20.10#820010)