Zhilong Hong created FLINK-22077:
------------------------------------

             Summary: Wrong way to calculate cross-region 
ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy
                 Key: FLINK-22077
                 URL: https://issues.apache.org/jira/browse/FLINK-22077
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.13.0
            Reporter: Zhilong Hong
             Fix For: 1.13.0


h3. Introduction

We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
{{PipelinedRegionSchedulingStrategy}}, it slows down the procedure of 
{{onExecutionStateChange}}, make the complexity from O(N) to O(N^2). Also the 
semantic of cross-region is totally wrong.
h3. Details

In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need 
to schedule all region with no external blocking edges, i.e., source regions. 
To decrease the complexity, we choose to schedule all the regions that has no 
external BLOCKING ConsumedPartitionGroups.

However, for the case illustrated in FLINK-22017, the region 2 has a 
ConsumedPartitionGroup, which has both internal and external blocking 
IntermediateResultPartitions. If we choose one to represent the entire 
ConsumedPartitionGroup, it may choose the internal one, and make the entire 
group internal. Region 2 will be scheduled.

As Region 1 is not finished, Region 2 cannot transition to running. A deadlock 
may happen if resource is not enough for both two regions.

To make it right, we introduced cross-region ConsumedPartitionGroups in 
FLINK-21330. The regions which has ConsumedPartitionGroups with both internal 
and external blocking IntermediateResultPartitions will be recorded. When we 
call {{startScheduling}}, these ConsumedPartitionGroups will be treated as 
external, and region 2 will not be scheduled.

But we have to admit that the implementation of cross-region is wrong. The 
ConsumedPartitionGroups that has multiple producer regions will be treated as 
cross-region groups. It is not the same logic as we mentioned above. The 
semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will be treated as cross-region groups, since their 
producers are in different regions. (Each producer has its own region.) This 
slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution

To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
calculate the producer regions for all ConsumedPartitionGroups, and then 
iterate all the regions and its ConsumedPartitionGroups. If the 
ConsumedPartitionGroup has two or more producer regions, and the regions 
contains current region, it is a cross-region ConsumedPartitionGroup. This 
meets the correct semantics, and make sure ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will not be treated as cross-region one. This fix will 
also decreases the complexity from O(N) to O(N^2). I prefer it's necessary to 
add this bug-fix to release 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to