Zhilong Hong created FLINK-24295: ------------------------------------ Summary: Too many requestPartitionState may jam the JobManager during task deployment Key: FLINK-24295 URL: https://issues.apache.org/jira/browse/FLINK-24295 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhilong Hong
After the optimization of the phase 2 we've done in FLINK-21110, the speed of task deployment has accelerated. However, we find that during the task deployment, there may be too many {{requestPartitionState}} RPC calls from TaskManagers that would jam the JobManager. Why would there be too many {{requestPartitionState}} RPC calls? After the optimization, the JobManager can submit tasks to TaskManagers quickly. If JobManager calls {{submitTask}} faster than the speed of dealing with {{submitTask}} by TaskManagers, there may be a scenario that some TaskManagers deploy tasks faster than other TaskManagers. When a downstream task is deployed, it would try to request partitions from upstream tasks, which may be located at a remote TaskManager. If the upstream tasks are not deployed, it would request the partition state from JobManager. In the worst case, the complexity of the computation and memory would be O(N^2). In our test with a streaming job, which has two vertices with the 8,000 parallelism and connected with all-to-all edges, in the worst case, there will be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. Each RPC call requires 1 KiB space in the heap memory of the JobManager. The overall space cost of {{requestPartitionState}} will be 32 GiB, which is a heavy burden for GC to deal with. In our test, the size of the heap memory of JobManager is 8 GiB. During the task deployment the JobManager gets more full GCs. The JobManager gets stuck since it is filled with full GCs and has no time to deal with the incoming RPC calls. The log is attached below. The worst thing is that there's no log outputted for this RPC call. When a user find the JobManager is get slower or get stuck, he/she won't be able to find out why. Why does this case rarely happen before? Before the optimization, it takes a long time to calculate TaskDeploymentDescriptors and send them to TaskManagers. JobManager calls {{submitTask}} more slowly than the speed of dealing with {{submitTask}} by TaskManagers in most cases. Since the deployment of tasks are topologically sorted, the upstream tasks is deployed before the downstream tasks, and this case rarely happens. In my opinion, the solution of this issue needs more discussion. According to the discussion in the pull request ([https://github.com/apache/flink/pull/6680]), it's not safe to remove this RPC call, because we cannot always make sure the assumption that an upstream task failure will always fail the downstream consumers is always right. -- This message was sent by Atlassian Jira (v8.3.4#803005)