Thanks to ZhiLong for improving the scheduler's performance.
I think many users would benefit from your work!
Best,
Guowei


On Wed, Feb 3, 2021 at 2:04 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for making the community aware of these performance improvements,
> Zhilong. I like them and I am looking forward to a faster Flink :-)
>
> Cheers,
> Till
>
> On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong <cypres...@outlook.com>
> wrote:
>
> > Hello, everyone:
> >
> > I would like to start the discussion about FLINK-21110: Optimize
> Scheduler
> > Performance for Large-Scale Jobs [1].
> >
> > According to the result of scheduler benchmarks we implemented in
> > FLINK-20612 [2], the bottleneck of deploying and running a large-scale
> job
> > in Flink is mainly focused on the following procedures:
> >
> > Procedure       Time complexity
> > Initializing ExecutionGraph
> > O(N^2)
> > Building DefaultExecutionTopology
> > O(N^2)
> > Initializing PipelinedRegionSchedulingStrategy
> > O(N^2)
> > Scheduling downstream tasks when a task finishes
> > O(N^2)
> > Calculating tasks to restart when a failover occurs
> > O(N^2)
> > Releasing result partitions
> > O(N^2)
> >
> > These procedures are all related to the complexity of the topology in the
> > ExecutionGraph. Between two vertices connected with the all-to-all edges,
> > all the upstream Intermediate ResultPartitions are connected to all
> > downstream ExecutionVertices. The computation complexity of building and
> > traversing all these edges will be O(N^2).
> >
> > As for memory usage, currently we use ExecutionEdges to store the
> > information of connections. For the all-to-all distribution type, there
> are
> > O(N^2) ExecutionEdges. We test a simple job with only two vertices. The
> > parallelisms of them are both 10k. Furthermore, they are connected with
> > all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the
> > 100M ExecutionEdges.
> >
> > In most large-scale jobs, there will be more than two vertices with large
> > parallelisms, and they would cost a lot of time and memory to deploy the
> > job.
> >
> > As we can see, for two JobVertices connected with the all-to-all
> > distribution type, all IntermediateResultPartitions produced by the
> > upstream ExecutionVertices are isomorphic, which means that the
> downstream
> > ExecutionVertices they connected are exactly the same. The downstream
> > ExecutionVertices belonging to the same JobVertex are also isomorphic, as
> > the upstream ResultPartitions they connect are the same, too.
> >
> > Since every JobEdge has exactly one distribution type, we can divide the
> > vertices and result partitions into groups according to the distribution
> > type of the JobEdge.
> >
> > For the all-to-all distribution type, since all downstream vertices are
> > isomorphic, they belong to a single group, and all the upstream result
> > partitions are connected to this group. Vice versa, all the upstream
> result
> > partitions also belong to a single group, and all the downstream vertices
> > are connected to this group. In the past, when we wanted to iterate all
> the
> > downstream vertices, we needed to loop over them n times, which leads to
> > the complexity of O(N^2). Now since all upstream result partitions are
> > connected to one downstream group, we just need to loop over them once,
> > with the complexity of O(N).
> >
> > For the pointwise distribution type, because each result partition is
> > connected to different downstream vertices, they should belong to
> different
> > groups. Vice versa, all the vertices belong to different groups. Since
> one
> > result partition group is connected to one vertex group pointwisely, the
> > computation complexity of looping over them is still O(N).
> >
> > After we group the result partitions and vertices, ExecutionEdge is no
> > longer needed. For the test job we mentioned above, the optimization can
> > effectively reduce the memory usage from 4.175 GiB to 12.076 MiB
> (estimated
> > via MXBean) in our POC. The time cost is reduced from 62.090 seconds to
> > 8.551 seconds (with 10k parallelism).
> >
> > The detailed design doc with illustrations is located at [3]. Please find
> > more details in the links below.
> >
> > Looking forward to your feedback.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21110
> > [2] https://issues.apache.org/jira/browse/FLINK-20612
> > [3]
> >
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
> >
> >
>

Reply via email to