Thanks for making the community aware of these performance improvements, Zhilong. I like them and I am looking forward to a faster Flink :-)
Cheers, Till On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong <cypres...@outlook.com> wrote: > Hello, everyone: > > I would like to start the discussion about FLINK-21110: Optimize Scheduler > Performance for Large-Scale Jobs [1]. > > According to the result of scheduler benchmarks we implemented in > FLINK-20612 [2], the bottleneck of deploying and running a large-scale job > in Flink is mainly focused on the following procedures: > > Procedure Time complexity > Initializing ExecutionGraph > O(N^2) > Building DefaultExecutionTopology > O(N^2) > Initializing PipelinedRegionSchedulingStrategy > O(N^2) > Scheduling downstream tasks when a task finishes > O(N^2) > Calculating tasks to restart when a failover occurs > O(N^2) > Releasing result partitions > O(N^2) > > These procedures are all related to the complexity of the topology in the > ExecutionGraph. Between two vertices connected with the all-to-all edges, > all the upstream Intermediate ResultPartitions are connected to all > downstream ExecutionVertices. The computation complexity of building and > traversing all these edges will be O(N^2). > > As for memory usage, currently we use ExecutionEdges to store the > information of connections. For the all-to-all distribution type, there are > O(N^2) ExecutionEdges. We test a simple job with only two vertices. The > parallelisms of them are both 10k. Furthermore, they are connected with > all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the > 100M ExecutionEdges. > > In most large-scale jobs, there will be more than two vertices with large > parallelisms, and they would cost a lot of time and memory to deploy the > job. > > As we can see, for two JobVertices connected with the all-to-all > distribution type, all IntermediateResultPartitions produced by the > upstream ExecutionVertices are isomorphic, which means that the downstream > ExecutionVertices they connected are exactly the same. The downstream > ExecutionVertices belonging to the same JobVertex are also isomorphic, as > the upstream ResultPartitions they connect are the same, too. > > Since every JobEdge has exactly one distribution type, we can divide the > vertices and result partitions into groups according to the distribution > type of the JobEdge. > > For the all-to-all distribution type, since all downstream vertices are > isomorphic, they belong to a single group, and all the upstream result > partitions are connected to this group. Vice versa, all the upstream result > partitions also belong to a single group, and all the downstream vertices > are connected to this group. In the past, when we wanted to iterate all the > downstream vertices, we needed to loop over them n times, which leads to > the complexity of O(N^2). Now since all upstream result partitions are > connected to one downstream group, we just need to loop over them once, > with the complexity of O(N). > > For the pointwise distribution type, because each result partition is > connected to different downstream vertices, they should belong to different > groups. Vice versa, all the vertices belong to different groups. Since one > result partition group is connected to one vertex group pointwisely, the > computation complexity of looping over them is still O(N). > > After we group the result partitions and vertices, ExecutionEdge is no > longer needed. For the test job we mentioned above, the optimization can > effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated > via MXBean) in our POC. The time cost is reduced from 62.090 seconds to > 8.551 seconds (with 10k parallelism). > > The detailed design doc with illustrations is located at [3]. Please find > more details in the links below. > > Looking forward to your feedback. > > [1] https://issues.apache.org/jira/browse/FLINK-21110 > [2] https://issues.apache.org/jira/browse/FLINK-20612 > [3] > https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing > >