[ 
https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767162#comment-17767162
 ] 

Rui Fan edited comment on FLINK-33123 at 9/20/23 3:30 PM:
----------------------------------------------------------

Hi [~Zhanghao Chen] , thanks for your report!

Your analysis is correct, and I also found this bug last month. I didn't report 
it because I don't know how to fix it properly and the FORWARD partitioner 
between 2 tasks aren't common(most of operator are chained directly).

First of all, I prefer changing it from FORWARD to REBALANCE, and I said the 
reason at [code 
review|https://github.com/apache/flink/pull/21443#discussion_r1042919428] of 
FLINK-30213.

Why I don't know how to fix it properly?

As you said: we should changing the distribution type to ALL_TO_ALL in 
jobgraph. Here is some issues here:
 * issue1: The jobGraph is just generated once for Adaptive Scheduler. And the 
jobgraph isn't changed even if the parallelism is changed(Just update the 
ExecutionGraph).
 * If the issue1 is solved, the issue2 is how to handle the case that from 
REBALANCE to FORWARD?
 ** Assume a job has taskA and taskB, the parallelism of them are 3, and user 
uses the FORWARD partitioner
 ** Time1: the parallelism of taskA is changed to 2, we should do 2 things:
 *** replace FORWARD partitioner by REBALANCE partitioner in StreamTask
 *** changing the distribution type to ALL_TO_ALL in jobgraph.
 ** Time2: the parallelism of taskB is changed to 2, we should do 2 things:
 *** Using the FORWARD partitioner
 *** Using the POINTWISE distribution type
 ** The case is fine.
 ** However, assume a job has taskA and taskB, the parallelism of them are 3, 
and user uses the *REBALANCE* partitioner.
 *** This case, user choose the REBALANCE partitioner even if it can use 
FORWARD partitioner here.
 *** For this case, we should still keep the REBALANCE partitioner and 
ALL_TO_ALL for time1 and time2.
 ** So time2 needs to consider should we update it to FORWARD partitioner and 
POINTWISE distribution type.

I'm not sure whether these 2 issues are clear.

Please let me know if I'm wrong, thanks~


was (Author: fanrui):
Hi [~Zhanghao Chen] , thanks for your report!

Your analysis is correct, and I also found this bug last month. I didn't report 
it because I don't know how to fix it properly.

First of all, I prefer changing it from FORWARD to REBALANCE, and I said the 
reason at [code 
review|https://github.com/apache/flink/pull/21443#discussion_r1042919428] of 
FLINK-30213.

Why I don't know how to fix it properly?

As you said: we should changing the distribution type to ALL_TO_ALL in 
jobgraph. Here is some issues here:
 * issue1: The jobGraph is just generated once for Adaptive Scheduler. And the 
jobgraph isn't changed even if the parallelism is changed(Just update the 
ExecutionGraph).
 * If the issue1 is solved, the issue2 is how to handle the case that from 
REBALANCE to FORWARD?
 ** Assume a job has taskA and taskB, the parallelism of them are 3, and user 
uses the FORWARD partitioner
 ** Time1: the parallelism of taskA is changed to 2, we should do 2 things:
 *** replace FORWARD partitioner by REBALANCE partitioner in StreamTask
 *** changing the distribution type to ALL_TO_ALL in jobgraph.
 ** Time2: the parallelism of taskB is changed to 2, we should do 2 things:
 *** Using the FORWARD partitioner 
 *** Using the POINTWISE distribution type
 ** The case is fine.
 ** However, assume a job has taskA and taskB, the parallelism of them are 3, 
and user uses the *REBALANCE* partitioner.
 *** This case, user choose the REBALANCE partitioner even if it can use 
FORWARD partitioner here.
 *** For this case, we should still keep the REBALANCE partitioner and 
ALL_TO_ALL for time1 and time2.
 ** So time2 needs to consider should we update it to FORWARD partitioner and 
POINTWISE distribution type.

I'm not sure whether these 2 issues are clear.

Please let me know if I'm wrong, thanks~

> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for 
> autoscaler and adaptive scheduler
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33123
>                 URL: https://issues.apache.org/jira/browse/FLINK-33123
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Runtime / Coordination
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Zhanghao Chen
>            Priority: Critical
>         Attachments: image-2023-09-20-15-09-22-733.png, 
> image-2023-09-20-15-14-04-679.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is 
> wrong when the parallelism is changed for a vertex with a FORWARD edge, which 
> is used by both the autoscaler and adaptive scheduler where one can change 
> the vertex parallelism dynamically. Fix is applied to dynamically replace 
> partitioner from FORWARD to REBLANCE on task deployment in 
> {{{}StreamTask{}}}: 
>  
> !image-2023-09-20-15-09-22-733.png|width=560,height=221!
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
>  # The connections between upstream and downstream tasks are determined by 
> the distribution type of the partitioner when generating execution graph on 
> the JM side. When the edge is FORWARD, the distribution type is POINTWISE, 
> and Flink will try to evenly distribute subpartitions to all downstream 
> tasks. If one want to change it to REBALANCE, the distribution type has to be 
> changed to ALL_TO_ALL to make all-to-all connections between upstream and 
> downstream tasks. However, the fix did not change the distribution type which 
> makes the network connections be set up in a wrong way.
>  # The FOWARD partitioner will be replaced if 
> environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the 
> task parallelism. However, the number of subpartitions here equals to the 
> number of downstream tasks of this particular task, which is also determined 
> by the distribution type of the partitioner when generating execution graph 
> on the JM side.  When ceil(downstream task parallelism / upstream task 
> parallelism) = upstream task parallelism, we will have the number of 
> subpartitions = task parallelism. For example, for a topology A (parallelism 
> 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A 
> task having 3 subpartition, and hence 1 task will have its number of 
> subpartitions equals to the task parallelism 2 and skip partitioner 
> replacement. As a result, that task will only send data to only one 
> downstream task as the FORWARD partitioner always send data to the first 
> subpartition. In fact, for a normal job with a FORWARD edge without any 
> autoscaling action, you will find that the partitioner is changed to 
> REBALANCE internally as the number of subpartitions always equals to 1 in 
> this case.
> !image-2023-09-20-15-14-04-679.png|width=892,height=301!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to