Thanks to ZhiLong for improving the scheduler's performance. I think many users would benefit from your work! Best, Guowei
On Wed, Feb 3, 2021 at 2:04 AM Till Rohrmann <trohrm...@apache.org> wrote: > Thanks for making the community aware of these performance improvements, > Zhilong. I like them and I am looking forward to a faster Flink :-) > > Cheers, > Till > > On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong <cypres...@outlook.com> > wrote: > > > Hello, everyone: > > > > I would like to start the discussion about FLINK-21110: Optimize > Scheduler > > Performance for Large-Scale Jobs [1]. > > > > According to the result of scheduler benchmarks we implemented in > > FLINK-20612 [2], the bottleneck of deploying and running a large-scale > job > > in Flink is mainly focused on the following procedures: > > > > Procedure Time complexity > > Initializing ExecutionGraph > > O(N^2) > > Building DefaultExecutionTopology > > O(N^2) > > Initializing PipelinedRegionSchedulingStrategy > > O(N^2) > > Scheduling downstream tasks when a task finishes > > O(N^2) > > Calculating tasks to restart when a failover occurs > > O(N^2) > > Releasing result partitions > > O(N^2) > > > > These procedures are all related to the complexity of the topology in the > > ExecutionGraph. Between two vertices connected with the all-to-all edges, > > all the upstream Intermediate ResultPartitions are connected to all > > downstream ExecutionVertices. The computation complexity of building and > > traversing all these edges will be O(N^2). > > > > As for memory usage, currently we use ExecutionEdges to store the > > information of connections. For the all-to-all distribution type, there > are > > O(N^2) ExecutionEdges. We test a simple job with only two vertices. The > > parallelisms of them are both 10k. Furthermore, they are connected with > > all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the > > 100M ExecutionEdges. > > > > In most large-scale jobs, there will be more than two vertices with large > > parallelisms, and they would cost a lot of time and memory to deploy the > > job. > > > > As we can see, for two JobVertices connected with the all-to-all > > distribution type, all IntermediateResultPartitions produced by the > > upstream ExecutionVertices are isomorphic, which means that the > downstream > > ExecutionVertices they connected are exactly the same. The downstream > > ExecutionVertices belonging to the same JobVertex are also isomorphic, as > > the upstream ResultPartitions they connect are the same, too. > > > > Since every JobEdge has exactly one distribution type, we can divide the > > vertices and result partitions into groups according to the distribution > > type of the JobEdge. > > > > For the all-to-all distribution type, since all downstream vertices are > > isomorphic, they belong to a single group, and all the upstream result > > partitions are connected to this group. Vice versa, all the upstream > result > > partitions also belong to a single group, and all the downstream vertices > > are connected to this group. In the past, when we wanted to iterate all > the > > downstream vertices, we needed to loop over them n times, which leads to > > the complexity of O(N^2). Now since all upstream result partitions are > > connected to one downstream group, we just need to loop over them once, > > with the complexity of O(N). > > > > For the pointwise distribution type, because each result partition is > > connected to different downstream vertices, they should belong to > different > > groups. Vice versa, all the vertices belong to different groups. Since > one > > result partition group is connected to one vertex group pointwisely, the > > computation complexity of looping over them is still O(N). > > > > After we group the result partitions and vertices, ExecutionEdge is no > > longer needed. For the test job we mentioned above, the optimization can > > effectively reduce the memory usage from 4.175 GiB to 12.076 MiB > (estimated > > via MXBean) in our POC. The time cost is reduced from 62.090 seconds to > > 8.551 seconds (with 10k parallelism). > > > > The detailed design doc with illustrations is located at [3]. Please find > > more details in the links below. > > > > Looking forward to your feedback. > > > > [1] https://issues.apache.org/jira/browse/FLINK-21110 > > [2] https://issues.apache.org/jira/browse/FLINK-20612 > > [3] > > > https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing > > > > >