[ https://issues.apache.org/jira/browse/FLINK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-21110: ----------------------------------- Labels: pull-request-available (was: ) > Optimize Scheduler Performance for Large-Scale Jobs > --------------------------------------------------- > > Key: FLINK-21110 > URL: https://issues.apache.org/jira/browse/FLINK-21110 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Reporter: Zhilong Hong > Assignee: Zhilong Hong > Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > Attachments: Illustration of Group.jpg > > > According to the result of scheduler benchmarks we implemented in > FLINK-20612, the bottleneck of deploying and running a large-scale job in > Flink is mainly focused on the following procedures: > |Procedure|Time complexity| > |Initializing ExecutionGraph|O(N^2)| > |Building DefaultExecutionTopology|O(N^2)| > |Initializing PipelinedRegionSchedulingStrategy|O(N^2)| > |Scheduling downstream tasks when a task finishes|O(N^2)| > |Calculating tasks to restart when a failover occurs|O(N^2)| > |Releasing result partitions|O(N^3)| > These procedures are all related to the complexity of the topology in the > ExecutionGraph. Between two vertices connected with the all-to-all edges, all > the upstream Intermediate ResultPartitions are connected to all downstream > ExecutionVertices. The computation complexity of building and traversing all > these edges will be O(N^2). > As for memory usage, currently we use ExecutionEdges to store the information > of connections. For the all-to-all distribution type, there are O(N^2) > ExecutionEdges. We test a simple job with only two vertices. The parallelisms > of them are both 10k. Furthermore, they are connected with all-to-all edges. > It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges. > In most large-scale jobs, there will be more than two vertices with large > parallelisms, and they would cost a lot of time and memory to deploy the job. > As we can see, for two JobVertices connected with the all-to-all distribution > type, all IntermediateResultPartitions produced by the upstream > ExecutionVertices are isomorphic, which means that the downstream > ExecutionVertices they connected are exactly the same. The downstream > ExecutionVertices belonging to the same JobVertex are also isomorphic, as the > upstream ResultPartitions they connect are the same, too. > Since every JobEdge has exactly one distribution type, we can divide the > vertices and result partitions into groups according to the distribution type > of the JobEdge. > For the all-to-all distribution type, since all downstream vertices are > isomorphic, they belong to a single group, and all the upstream result > partitions are connected to this group. Vice versa, all the upstream result > partitions also belong to a single group, and all the downstream vertices are > connected to this group. In the past, when we wanted to iterate all the > downstream vertices, we needed to loop over them n times, which leads to the > complexity of O(N^2). Now since all upstream result partitions are connected > to one downstream group, we just need to loop over them once, with the > complexity of O(N). > For the pointwise distribution type, because each result partition is > connected to different downstream vertices, they should belong to different > groups. Vice versa, all the vertices belong to different groups. Since one > result partition group is connected to one vertex group pointwisely, the > computation complexity of looping over them is still O(N). > !Illustration of Group.jpg|height=249! > After we group the result partitions and vertices, ExecutionEdge is no longer > needed. For the test job we mentioned above, the optimization can effectively > reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) > in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds > (with 10k parallelism). > > The detailed design doc: > https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing > -- This message was sent by Atlassian Jira (v8.3.4#803005)