Zhilong Hong created FLINK-24295:
------------------------------------

             Summary: Too many requestPartitionState may jam the JobManager 
during task deployment
                 Key: FLINK-24295
                 URL: https://issues.apache.org/jira/browse/FLINK-24295
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.14.0
            Reporter: Zhilong Hong


After the optimization of the phase 2 we've done in FLINK-21110, the speed of 
task deployment has accelerated. However, we find that during the task 
deployment, there may be too many {{requestPartitionState}} RPC calls from 
TaskManagers that would jam the JobManager.

Why would there be too many {{requestPartitionState}} RPC calls? After the 
optimization, the JobManager can submit tasks to TaskManagers quickly. If 
JobManager calls {{submitTask}} faster than the speed of dealing with 
{{submitTask}} by TaskManagers, there may be a scenario that some TaskManagers 
deploy tasks faster than other TaskManagers.

When a downstream task is deployed, it would try to request partitions from 
upstream tasks, which may be located at a remote TaskManager. If the upstream 
tasks are not deployed, it would request the partition state from JobManager. 
In the worst case, the complexity of the computation and memory would be O(N^2).

In our test with a streaming job, which has two vertices with the 8,000 
parallelism and connected with all-to-all edges, in the worst case, there will 
be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. Each RPC 
call requires 1 KiB space in the heap memory of the JobManager. The overall 
space cost of {{requestPartitionState}} will be 32 GiB, which is a heavy burden 
for GC to deal with.

In our test, the size of the heap memory of JobManager is 8 GiB. During the 
task deployment the JobManager gets more full GCs. The JobManager gets stuck 
since it is filled with full GCs and has no time to deal with the incoming RPC 
calls. The log is attached below.

The worst thing is that there's no log outputted for this RPC call. When a user 
find the JobManager is get slower or get stuck, he/she won't be able to find 
out why.

Why does this case rarely happen before? Before the optimization, it takes a 
long time to calculate TaskDeploymentDescriptors and send them to TaskManagers. 
JobManager calls {{submitTask}} more slowly than the speed of dealing with 
{{submitTask}} by TaskManagers in most cases. Since the deployment of tasks are 
topologically sorted, the upstream tasks is deployed before the downstream 
tasks, and this case rarely happens.

In my opinion, the solution of this issue needs more discussion. According to 
the discussion in the pull request 
([https://github.com/apache/flink/pull/6680]), it's not safe to remove this RPC 
call, because we cannot always make sure the assumption that an upstream task 
failure will always fail the downstream consumers is always right.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to