[ 
https://issues.apache.org/jira/browse/FLINK-22077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-22077:
---------------------------------
    Comment: was deleted

(was: cc [~trohrmann] I personally think it's necessary to add this bug-fix to 
release 1.13.
 )

> Wrong way to calculate cross-region ConsumedPartitionGroups in 
> PipelinedRegionSchedulingStrategy
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22077
>                 URL: https://issues.apache.org/jira/browse/FLINK-22077
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Zhilong Hong
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> h3. Introduction
> We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
> {{PipelinedRegionSchedulingStrategy}} in FLINK-21330, it slows down the 
> procedure of {{onExecutionStateChange}}, makes the complexity increase from 
> O(N) to O(N^2). Also the semantic of cross-region is totally wrong.
> h3. Details
> In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we 
> need to schedule all region with no external blocking edges, i.e., source 
> regions. To decrease the complexity, we choose to schedule all the regions 
> that has no external BLOCKING ConsumedPartitionGroups.
> However, for the case illustrated in FLINK-22017, the region 2 has a 
> ConsumedPartitionGroup, which has both internal and external blocking 
> IntermediateResultPartitions. If we choose one to represent the entire 
> ConsumedPartitionGroup, it may choose the internal one, and make the entire 
> group internal. Region 2 will be scheduled.
> As Region 1 is not finished, Region 2 cannot transition to running. A 
> deadlock may happen if resource is not enough for both two regions.
> To make it right, we introduced cross-region ConsumedPartitionGroups in 
> FLINK-21330. The ConsumedPartitionGroups with both internal and external 
> blocking IntermediateResultPartitions will be recorded. When we call 
> {{startScheduling}}, these ConsumedPartitionGroups will be treated as 
> external ones, and region 2 will not be scheduled.
> But we have to admit that the implementation of cross-region is wrong. The 
> ConsumedPartitionGroups that has multiple producer regions will be treated as 
> cross-region groups. It is not the same logic as we mentioned above. The 
> semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
> ConsumedPartitionGroups will be treated as cross-region groups, since their 
> producers are in different regions. (Each producer has its own region.) This 
> slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
> h3. Solution
> To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
> calculate the producer regions for all ConsumedPartitionGroups, and then 
> iterate all the regions and its ConsumedPartitionGroups. If the 
> ConsumedPartitionGroup has two or more producer regions, and the regions 
> contains current region, it is a cross-region ConsumedPartitionGroup. This 
> meets the correct semantics, and make sure ALL-TO-ALL BLOCKING 
> ConsumedPartitionGroups will not be treated as cross-region one. This fix 
> will also decreases the complexity from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to