[ https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767162#comment-17767162 ]
Rui Fan edited comment on FLINK-33123 at 9/20/23 3:30 PM: ---------------------------------------------------------- Hi [~Zhanghao Chen] , thanks for your report! Your analysis is correct, and I also found this bug last month. I didn't report it because I don't know how to fix it properly and the FORWARD partitioner between 2 tasks aren't common(most of operator are chained directly). First of all, I prefer changing it from FORWARD to REBALANCE, and I said the reason at [code review|https://github.com/apache/flink/pull/21443#discussion_r1042919428] of FLINK-30213. Why I don't know how to fix it properly? As you said: we should changing the distribution type to ALL_TO_ALL in jobgraph. Here is some issues here: * issue1: The jobGraph is just generated once for Adaptive Scheduler. And the jobgraph isn't changed even if the parallelism is changed(Just update the ExecutionGraph). * If the issue1 is solved, the issue2 is how to handle the case that from REBALANCE to FORWARD? ** Assume a job has taskA and taskB, the parallelism of them are 3, and user uses the FORWARD partitioner ** Time1: the parallelism of taskA is changed to 2, we should do 2 things: *** replace FORWARD partitioner by REBALANCE partitioner in StreamTask *** changing the distribution type to ALL_TO_ALL in jobgraph. ** Time2: the parallelism of taskB is changed to 2, we should do 2 things: *** Using the FORWARD partitioner *** Using the POINTWISE distribution type ** The case is fine. ** However, assume a job has taskA and taskB, the parallelism of them are 3, and user uses the *REBALANCE* partitioner. *** This case, user choose the REBALANCE partitioner even if it can use FORWARD partitioner here. *** For this case, we should still keep the REBALANCE partitioner and ALL_TO_ALL for time1 and time2. ** So time2 needs to consider should we update it to FORWARD partitioner and POINTWISE distribution type. I'm not sure whether these 2 issues are clear. Please let me know if I'm wrong, thanks~ was (Author: fanrui): Hi [~Zhanghao Chen] , thanks for your report! Your analysis is correct, and I also found this bug last month. I didn't report it because I don't know how to fix it properly. First of all, I prefer changing it from FORWARD to REBALANCE, and I said the reason at [code review|https://github.com/apache/flink/pull/21443#discussion_r1042919428] of FLINK-30213. Why I don't know how to fix it properly? As you said: we should changing the distribution type to ALL_TO_ALL in jobgraph. Here is some issues here: * issue1: The jobGraph is just generated once for Adaptive Scheduler. And the jobgraph isn't changed even if the parallelism is changed(Just update the ExecutionGraph). * If the issue1 is solved, the issue2 is how to handle the case that from REBALANCE to FORWARD? ** Assume a job has taskA and taskB, the parallelism of them are 3, and user uses the FORWARD partitioner ** Time1: the parallelism of taskA is changed to 2, we should do 2 things: *** replace FORWARD partitioner by REBALANCE partitioner in StreamTask *** changing the distribution type to ALL_TO_ALL in jobgraph. ** Time2: the parallelism of taskB is changed to 2, we should do 2 things: *** Using the FORWARD partitioner *** Using the POINTWISE distribution type ** The case is fine. ** However, assume a job has taskA and taskB, the parallelism of them are 3, and user uses the *REBALANCE* partitioner. *** This case, user choose the REBALANCE partitioner even if it can use FORWARD partitioner here. *** For this case, we should still keep the REBALANCE partitioner and ALL_TO_ALL for time1 and time2. ** So time2 needs to consider should we update it to FORWARD partitioner and POINTWISE distribution type. I'm not sure whether these 2 issues are clear. Please let me know if I'm wrong, thanks~ > Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for > autoscaler and adaptive scheduler > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-33123 > URL: https://issues.apache.org/jira/browse/FLINK-33123 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Runtime / Coordination > Affects Versions: 1.17.0, 1.18.0 > Reporter: Zhanghao Chen > Priority: Critical > Attachments: image-2023-09-20-15-09-22-733.png, > image-2023-09-20-15-14-04-679.png > > > *Background* > https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is > wrong when the parallelism is changed for a vertex with a FORWARD edge, which > is used by both the autoscaler and adaptive scheduler where one can change > the vertex parallelism dynamically. Fix is applied to dynamically replace > partitioner from FORWARD to REBLANCE on task deployment in > {{{}StreamTask{}}}: > > !image-2023-09-20-15-09-22-733.png|width=560,height=221! > *Problem* > Unfortunately, the fix is still buggy in two aspects: > # The connections between upstream and downstream tasks are determined by > the distribution type of the partitioner when generating execution graph on > the JM side. When the edge is FORWARD, the distribution type is POINTWISE, > and Flink will try to evenly distribute subpartitions to all downstream > tasks. If one want to change it to REBALANCE, the distribution type has to be > changed to ALL_TO_ALL to make all-to-all connections between upstream and > downstream tasks. However, the fix did not change the distribution type which > makes the network connections be set up in a wrong way. > # The FOWARD partitioner will be replaced if > environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the > task parallelism. However, the number of subpartitions here equals to the > number of downstream tasks of this particular task, which is also determined > by the distribution type of the partitioner when generating execution graph > on the JM side. When ceil(downstream task parallelism / upstream task > parallelism) = upstream task parallelism, we will have the number of > subpartitions = task parallelism. For example, for a topology A (parallelism > 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A > task having 3 subpartition, and hence 1 task will have its number of > subpartitions equals to the task parallelism 2 and skip partitioner > replacement. As a result, that task will only send data to only one > downstream task as the FORWARD partitioner always send data to the first > subpartition. In fact, for a normal job with a FORWARD edge without any > autoscaling action, you will find that the partitioner is changed to > REBALANCE internally as the number of subpartitions always equals to 1 in > this case. > !image-2023-09-20-15-14-04-679.png|width=892,height=301! -- This message was sent by Atlassian Jira (v8.20.10#820010)