[ https://issues.apache.org/jira/browse/FLINK-22077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhilong Hong updated FLINK-22077: --------------------------------- Comment: was deleted (was: cc [~trohrmann] I personally think it's necessary to add this bug-fix to release 1.13. ) > Wrong way to calculate cross-region ConsumedPartitionGroups in > PipelinedRegionSchedulingStrategy > ------------------------------------------------------------------------------------------------ > > Key: FLINK-22077 > URL: https://issues.apache.org/jira/browse/FLINK-22077 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.13.0 > Reporter: Zhilong Hong > Priority: Critical > Fix For: 1.13.0 > > > h3. Introduction > We implement a wrong way to calculate cross-region ConsumedPartitionGroups in > {{PipelinedRegionSchedulingStrategy}} in FLINK-21330, it slows down the > procedure of {{onExecutionStateChange}}, makes the complexity increase from > O(N) to O(N^2). Also the semantic of cross-region is totally wrong. > h3. Details > In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we > need to schedule all region with no external blocking edges, i.e., source > regions. To decrease the complexity, we choose to schedule all the regions > that has no external BLOCKING ConsumedPartitionGroups. > However, for the case illustrated in FLINK-22017, the region 2 has a > ConsumedPartitionGroup, which has both internal and external blocking > IntermediateResultPartitions. If we choose one to represent the entire > ConsumedPartitionGroup, it may choose the internal one, and make the entire > group internal. Region 2 will be scheduled. > As Region 1 is not finished, Region 2 cannot transition to running. A > deadlock may happen if resource is not enough for both two regions. > To make it right, we introduced cross-region ConsumedPartitionGroups in > FLINK-21330. The ConsumedPartitionGroups with both internal and external > blocking IntermediateResultPartitions will be recorded. When we call > {{startScheduling}}, these ConsumedPartitionGroups will be treated as > external ones, and region 2 will not be scheduled. > But we have to admit that the implementation of cross-region is wrong. The > ConsumedPartitionGroups that has multiple producer regions will be treated as > cross-region groups. It is not the same logic as we mentioned above. The > semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING > ConsumedPartitionGroups will be treated as cross-region groups, since their > producers are in different regions. (Each producer has its own region.) This > slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges. > h3. Solution > To correctly calculate the cross-region ConsumedPartitionGroups, we can just > calculate the producer regions for all ConsumedPartitionGroups, and then > iterate all the regions and its ConsumedPartitionGroups. If the > ConsumedPartitionGroup has two or more producer regions, and the regions > contains current region, it is a cross-region ConsumedPartitionGroup. This > meets the correct semantics, and make sure ALL-TO-ALL BLOCKING > ConsumedPartitionGroups will not be treated as cross-region one. This fix > will also decreases the complexity from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)