[ https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu reassigned FLINK-23005: ------------------------------- Assignee: Zhilong Hong > Optimize the deployment of tasks > -------------------------------- > > Key: FLINK-23005 > URL: https://issues.apache.org/jira/browse/FLINK-23005 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Reporter: Zhilong Hong > Assignee: Zhilong Hong > Priority: Major > Fix For: 1.14.0 > > > h3. Introduction > The optimizations introduced in FLINK-21110 so far have improved the > performance of job initialization, failover and partitions releasing. > However, the task deployment is still slow. For a job with two vertices, each > vertex has 8k parallelism and they are connected with the all-to-all edge. It > takes 32.611s to deploy all the tasks and make them transition to running. If > the parallelisms are 16k, it may take more than 2 minutes. > As the creation of TaskDeploymentDescriptors runs in the main thread of > jobmanager, it means that the jobmanager cannot deal with other akka messages > like heartbeats, task status update, and etc., for more than two minutes. > > All in all, currently there are two issues in the deployment of tasks for > large scale jobs: > # It takes a long time to deploy tasks, especially for all-to-all edges. > # Heartbeat timeout may happen during or after the procedure of task > deployments. For the streaming job, it would cause the failover of the entire > region. The job may never transition to running since there would be another > heartbeat timeout during the procedure of new task deployments. > h3. Proposal > Task deployment involves the following procedures: > # Jobmanager creates TaskDeploymentDescriptor for each task in the main > thread > # TaskDeploymentDescriptor is serialized in the future executor > # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call > # TaskExecutors create a new task thread and execute it > The optimization contains two parts: > *1. Cache the compressed serialized value of ShuffleDescriptors* > ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the > IntermediateResultPartitions that a task consumes. For the downstream > vertices connected with the all-to-all edge that has _N_ parallelism, we need > to calculate _N_ ShuffleDescriptors for _N_ times. However, for these > vertices, they share the same ShuffleDescriptors since they all consume the > same IntermediateResultPartitions. We don't need to calculate > ShuffleDescriptors for each downstream vertex individually. We can just cache > them. This will decrease the overall complexity of calculating > TaskDeploymentDescriptors from O(N^2) to O(N). > Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ > times, so we can just cache the serialized value of ShuffleDescriptors > instead of the original object. To decrease the size of akka messages and > reduce the transmission of replicated data over the network, these serialized > value can be compressed. > *2. Distribute the ShuffleDescriptors via blob server* > For ShuffleDescriptors of vertices with 8k parallelism, the size of their > serialized value is more than 700 Kilobytes. After the compression, it would > be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is > more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the > TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would > become a heavy burden for the garbage collector to deal with. > In TaskDeploymentDescriptor, JobInformation and TaskInformation are > distributed via the blob server if their sizes exceed a certain threshold > (which is defined as {{blob.offload.minsize}}). TaskExecutors request the > information from the blob server once they begin to process the > TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep > all the copies in the heap memory until the TaskDeploymentDescriptors are all > sent. There will be only one copy on the blob server. Like the > JobInformation, we can just distribute the cached ShuffleDescriptors via the > blob server if their overall size has exceeded the threshold. > h3. Summary > In summary, the optimization of task deployment is to introduce a cache for > the TaskDeploymentDescriptor. We cache the compressed serialized value of > ShuffleDescriptors. If the size of the value exceeds a certain threshold, the > value would be distributed via the blob server. > h3. Comparison > We implemented a POC and conducted an experiment to compare the performance > of our optimization. We choose the streaming job in the experiment because no > task will be running until all tasks are deployed. This avoids other > disturbing factors. The job contains two vertices: a source and a sink. They > are connected with an all-to-all edge. > The results illustrated below are the time interval between the timestamp of > the first task that transitions to _deploying_ and the timestamp of the last > task that transitions to _running_: > ||Parallelism||Before||After || > |8000*8000|32.611s|6.480s| > |16000*16000|128.408s|19.051s| -- This message was sent by Atlassian Jira (v8.3.4#803005)