[ 
https://issues.apache.org/jira/browse/FLINK-22077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-22077:
---------------------------------
    Description: 
h3. Introduction

We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
{{PipelinedRegionSchedulingStrategy}} in FLINK-21330, it slows down the 
procedure of {{onExecutionStateChange}}, makes the complexity increase from 
O(N) to O(N^2). Also the semantic of cross-region is totally wrong.
h3. Details

In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need 
to schedule all region with no external blocking edges, i.e., source regions. 
To decrease the complexity, we choose to schedule all the regions that has no 
external BLOCKING ConsumedPartitionGroups.

However, for the case illustrated in FLINK-22017, the region 2 has a 
ConsumedPartitionGroup, which has both internal and external blocking 
IntermediateResultPartitions. If we choose one to represent the entire 
ConsumedPartitionGroup, it may choose the internal one, and make the entire 
group internal. Region 2 will be scheduled.

As Region 1 is not finished, Region 2 cannot transition to running. A deadlock 
may happen if resource is not enough for both two regions.

To make it right, we introduced cross-region ConsumedPartitionGroups in 
FLINK-21330. The ConsumedPartitionGroups with both internal and external 
blocking IntermediateResultPartitions will be recorded. When we call 
{{startScheduling}}, these ConsumedPartitionGroups will be treated as external 
ones, and region 2 will not be scheduled.

But we have to admit that the implementation of cross-region is wrong. The 
ConsumedPartitionGroups that has multiple producer regions will be treated as 
cross-region groups. It is not the same logic as we mentioned above. The 
semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will be treated as cross-region groups, since their 
producers are in different regions. (Each producer has its own region.) This 
slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution

To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
calculate the producer regions for all ConsumedPartitionGroups, and then 
iterate all the regions and its ConsumedPartitionGroups. If the 
ConsumedPartitionGroup has two or more producer regions, and the regions 
contains the current region, it is a cross-region ConsumedPartitionGroup. This 
meets the correct semantics, and makes sure ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will not be treated as cross-region ones. This fix will 
also decrease the complexity from O(N^2) to O(N).

  was:
h3. Introduction

We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
{{PipelinedRegionSchedulingStrategy}} in FLINK-21330, it slows down the 
procedure of {{onExecutionStateChange}}, makes the complexity increase from 
O(N) to O(N^2). Also the semantic of cross-region is totally wrong.
h3. Details

In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need 
to schedule all region with no external blocking edges, i.e., source regions. 
To decrease the complexity, we choose to schedule all the regions that has no 
external BLOCKING ConsumedPartitionGroups.

However, for the case illustrated in FLINK-22017, the region 2 has a 
ConsumedPartitionGroup, which has both internal and external blocking 
IntermediateResultPartitions. If we choose one to represent the entire 
ConsumedPartitionGroup, it may choose the internal one, and make the entire 
group internal. Region 2 will be scheduled.

As Region 1 is not finished, Region 2 cannot transition to running. A deadlock 
may happen if resource is not enough for both two regions.

To make it right, we introduced cross-region ConsumedPartitionGroups in 
FLINK-21330. The ConsumedPartitionGroups with both internal and external 
blocking IntermediateResultPartitions will be recorded. When we call 
{{startScheduling}}, these ConsumedPartitionGroups will be treated as external 
ones, and region 2 will not be scheduled.

But we have to admit that the implementation of cross-region is wrong. The 
ConsumedPartitionGroups that has multiple producer regions will be treated as 
cross-region groups. It is not the same logic as we mentioned above. The 
semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will be treated as cross-region groups, since their 
producers are in different regions. (Each producer has its own region.) This 
slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution

To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
calculate the producer regions for all ConsumedPartitionGroups, and then 
iterate all the regions and its ConsumedPartitionGroups. If the 
ConsumedPartitionGroup has two or more producer regions, and the regions 
contains the current region, it is a cross-region ConsumedPartitionGroup. This 
meets the correct semantics, and makes sure ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will not be treated as cross-region ones. This fix will 
also decreases the complexity from O(N^2) to O(N).


> Wrong way to calculate cross-region ConsumedPartitionGroups in 
> PipelinedRegionSchedulingStrategy
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22077
>                 URL: https://issues.apache.org/jira/browse/FLINK-22077
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Zhilong Hong
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> h3. Introduction
> We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
> {{PipelinedRegionSchedulingStrategy}} in FLINK-21330, it slows down the 
> procedure of {{onExecutionStateChange}}, makes the complexity increase from 
> O(N) to O(N^2). Also the semantic of cross-region is totally wrong.
> h3. Details
> In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we 
> need to schedule all region with no external blocking edges, i.e., source 
> regions. To decrease the complexity, we choose to schedule all the regions 
> that has no external BLOCKING ConsumedPartitionGroups.
> However, for the case illustrated in FLINK-22017, the region 2 has a 
> ConsumedPartitionGroup, which has both internal and external blocking 
> IntermediateResultPartitions. If we choose one to represent the entire 
> ConsumedPartitionGroup, it may choose the internal one, and make the entire 
> group internal. Region 2 will be scheduled.
> As Region 1 is not finished, Region 2 cannot transition to running. A 
> deadlock may happen if resource is not enough for both two regions.
> To make it right, we introduced cross-region ConsumedPartitionGroups in 
> FLINK-21330. The ConsumedPartitionGroups with both internal and external 
> blocking IntermediateResultPartitions will be recorded. When we call 
> {{startScheduling}}, these ConsumedPartitionGroups will be treated as 
> external ones, and region 2 will not be scheduled.
> But we have to admit that the implementation of cross-region is wrong. The 
> ConsumedPartitionGroups that has multiple producer regions will be treated as 
> cross-region groups. It is not the same logic as we mentioned above. The 
> semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
> ConsumedPartitionGroups will be treated as cross-region groups, since their 
> producers are in different regions. (Each producer has its own region.) This 
> slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
> h3. Solution
> To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
> calculate the producer regions for all ConsumedPartitionGroups, and then 
> iterate all the regions and its ConsumedPartitionGroups. If the 
> ConsumedPartitionGroup has two or more producer regions, and the regions 
> contains the current region, it is a cross-region ConsumedPartitionGroup. 
> This meets the correct semantics, and makes sure ALL-TO-ALL BLOCKING 
> ConsumedPartitionGroups will not be treated as cross-region ones. This fix 
> will also decrease the complexity from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to