Thanks for making the community aware of these performance improvements,
Zhilong. I like them and I am looking forward to a faster Flink :-)

Cheers,
Till

On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong <cypres...@outlook.com> wrote:

> Hello, everyone:
>
> I would like to start the discussion about FLINK-21110: Optimize Scheduler
> Performance for Large-Scale Jobs [1].
>
> According to the result of scheduler benchmarks we implemented in
> FLINK-20612 [2], the bottleneck of deploying and running a large-scale job
> in Flink is mainly focused on the following procedures:
>
> Procedure       Time complexity
> Initializing ExecutionGraph
> O(N^2)
> Building DefaultExecutionTopology
> O(N^2)
> Initializing PipelinedRegionSchedulingStrategy
> O(N^2)
> Scheduling downstream tasks when a task finishes
> O(N^2)
> Calculating tasks to restart when a failover occurs
> O(N^2)
> Releasing result partitions
> O(N^2)
>
> These procedures are all related to the complexity of the topology in the
> ExecutionGraph. Between two vertices connected with the all-to-all edges,
> all the upstream Intermediate ResultPartitions are connected to all
> downstream ExecutionVertices. The computation complexity of building and
> traversing all these edges will be O(N^2).
>
> As for memory usage, currently we use ExecutionEdges to store the
> information of connections. For the all-to-all distribution type, there are
> O(N^2) ExecutionEdges. We test a simple job with only two vertices. The
> parallelisms of them are both 10k. Furthermore, they are connected with
> all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the
> 100M ExecutionEdges.
>
> In most large-scale jobs, there will be more than two vertices with large
> parallelisms, and they would cost a lot of time and memory to deploy the
> job.
>
> As we can see, for two JobVertices connected with the all-to-all
> distribution type, all IntermediateResultPartitions produced by the
> upstream ExecutionVertices are isomorphic, which means that the downstream
> ExecutionVertices they connected are exactly the same. The downstream
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as
> the upstream ResultPartitions they connect are the same, too.
>
> Since every JobEdge has exactly one distribution type, we can divide the
> vertices and result partitions into groups according to the distribution
> type of the JobEdge.
>
> For the all-to-all distribution type, since all downstream vertices are
> isomorphic, they belong to a single group, and all the upstream result
> partitions are connected to this group. Vice versa, all the upstream result
> partitions also belong to a single group, and all the downstream vertices
> are connected to this group. In the past, when we wanted to iterate all the
> downstream vertices, we needed to loop over them n times, which leads to
> the complexity of O(N^2). Now since all upstream result partitions are
> connected to one downstream group, we just need to loop over them once,
> with the complexity of O(N).
>
> For the pointwise distribution type, because each result partition is
> connected to different downstream vertices, they should belong to different
> groups. Vice versa, all the vertices belong to different groups. Since one
> result partition group is connected to one vertex group pointwisely, the
> computation complexity of looping over them is still O(N).
>
> After we group the result partitions and vertices, ExecutionEdge is no
> longer needed. For the test job we mentioned above, the optimization can
> effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated
> via MXBean) in our POC. The time cost is reduced from 62.090 seconds to
> 8.551 seconds (with 10k parallelism).
>
> The detailed design doc with illustrations is located at [3]. Please find
> more details in the links below.
>
> Looking forward to your feedback.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21110
> [2] https://issues.apache.org/jira/browse/FLINK-20612
> [3]
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>
>

Reply via email to