[ 
https://issues.apache.org/jira/browse/FLINK-21201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-21201:
---------------------------------
    Description: 
When we are trying to run batch jobs with 8k parallelism, it takes a long time 
to deploy the vertices. After the investigation, we find that creating 
BoundedBlockingSubpartition blocks TaskManager’s main thread during the 
procedure of {{submitTask}}. 

When JobMaster invokes {{submitTask}} and sends an RPC call to the TaskManager, 
the TaskManager will receive the RPC call and execute the {{submitTask}} method 
in its main thread. In the {{submitTask}} method, the TaskExecutor will create 
a Task instance and try to start it. During the creation, the TaskExecutor will 
create the ResultPartition and its ResultSubpartitions. 

For the batch job, the type of ResultSubpartitions is the 
BoundedBlockingSubpartition with the FileChannelBoundedData. The 
BoundedBlockingSubpartition will create a file on the local disk, which is an 
IO operation and could take a long time. 

In our test, it would take at most 28 seconds to create 8k 
BoundedBlockingSubpartitions. This procedure blocks the main thread of the 
TaskManager, and would lead to heartbeat timeout and slow task deploying. In my 
opinion, the IO operation should be executed with IOExecutor rather than the 
main thread. 

I added some logs to show what TaskExecutor is doing during {{submitTask}}.
{code:java}
2021-01-29 14:44:37,557 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Start to 
submit task #898 (c9aefd1d30c2b133ba04ad495cd894fd)
2021-01-29 14:44:37,557 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
38cafd5b456cc8ff873bbe18e4bf708a.
2021-01-29 14:44:37,932 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Start to init 
Task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
2021-01-29 14:44:37,932 INFO  
org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Start to 
create 1 result partition(s).2021-01-29 14:44:37,932 INFO  
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - 
Initializing BoundedBlockingResultPartitions
2021-01-29 14:44:37,932 INFO  
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - Start 
to create 8000 FILE BoundedBlockingSubpartitions.
2021-01-29 14:44:37,932 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition [] - 
FileChannel #0 created.

... ...

2021-01-29 14:45:06,052 INFO  
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition [] - 
FileChannel #7999 created.
2021-01-29 14:45:06,052 INFO  
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - 
Finish creating 8000 FILE BoundedBlockingSubpartitions.
2021-01-29 14:45:06,052 INFO  
org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Finish 
creating 1 result partition(s).
2021-01-29 14:45:06,052 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Finish 
initializing task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
2021-01-29 14:45:06,052 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: Source 0 (898/8000)#0 (c9aefd1d30c2b133ba04ad495cd894fd), deploy into 
slot with allocation id 38cafd5b456cc8ff873bbe18e4bf708a.
2021-01-29 14:45:06,053 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Source 0 (898/8000)#0 
(c9aefd1d30c2b133ba04ad495cd894fd) switched from CREATED to DEPLOYING.
{code}
We can see that it takes nearly 29 seconds to create 8k 
BoundedBlockingSubpartitions, and this procedure blocks the main thread in the 
TaskExecutor.

The log of JobManager and TaskManager is attached below. The most typical task 
is Source 0: #898.

  was:
When we are trying to run batch jobs with 8k parallelism, it takes a long time 
to deploy the vertices. After the investigation, we find that creating 
BoundedBlockingSubpartition blocks TaskManager’s main thread during the 
procedure of {{submitTask}}. 

When JobMaster invokes {{submitTask}} and sends an RPC call to the TaskManager, 
the TaskManager will receive the RPC call and execute the {{submitTask}} method 
in its main thread. In the {{submitTask}} method, the TaskExecutor will create 
a Task instance and try to start it. During the creation, the TaskExecutor will 
create the ResultPartition and its ResultSubpartitions. 

For the batch job, the type of ResultSubpartitions is the 
BoundedBlockingSubpartition with the FileChannelBoundedData. The 
BoundedBlockingSubpartition will create a file on the local disk, which is an 
IO operation and could take a long time. 

In our test, it would take at most 28 seconds to create 8k 
BoundedBlockingSubpartitions. This procedure blocks the main thread of the 
TaskManager, and would lead to heartbeat timeout and slow task deploying. In my 
opinion, the IO operation should be executed with IOExecutor rather than the 
main thread. 

The log of JobManager and TaskManager is attached below. A typical task is 
Source 0: #898.


> Creating BoundedBlockingSubpartition blocks TaskManager’s main thread
> ---------------------------------------------------------------------
>
>                 Key: FLINK-21201
>                 URL: https://issues.apache.org/jira/browse/FLINK-21201
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.12.1
>            Reporter: Zhilong Hong
>            Priority: Major
>         Attachments: jobmanager.log.tar.gz, taskmanager.log.tar.gz
>
>
> When we are trying to run batch jobs with 8k parallelism, it takes a long 
> time to deploy the vertices. After the investigation, we find that creating 
> BoundedBlockingSubpartition blocks TaskManager’s main thread during the 
> procedure of {{submitTask}}. 
> When JobMaster invokes {{submitTask}} and sends an RPC call to the 
> TaskManager, the TaskManager will receive the RPC call and execute the 
> {{submitTask}} method in its main thread. In the {{submitTask}} method, the 
> TaskExecutor will create a Task instance and try to start it. During the 
> creation, the TaskExecutor will create the ResultPartition and its 
> ResultSubpartitions. 
> For the batch job, the type of ResultSubpartitions is the 
> BoundedBlockingSubpartition with the FileChannelBoundedData. The 
> BoundedBlockingSubpartition will create a file on the local disk, which is an 
> IO operation and could take a long time. 
> In our test, it would take at most 28 seconds to create 8k 
> BoundedBlockingSubpartitions. This procedure blocks the main thread of the 
> TaskManager, and would lead to heartbeat timeout and slow task deploying. In 
> my opinion, the IO operation should be executed with IOExecutor rather than 
> the main thread. 
> I added some logs to show what TaskExecutor is doing during {{submitTask}}.
> {code:java}
> 2021-01-29 14:44:37,557 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Start to 
> submit task #898 (c9aefd1d30c2b133ba04ad495cd894fd)
> 2021-01-29 14:44:37,557 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot 38cafd5b456cc8ff873bbe18e4bf708a.
> 2021-01-29 14:44:37,932 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Start to 
> init Task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
> 2021-01-29 14:44:37,932 INFO  
> org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Start to 
> create 1 result partition(s).2021-01-29 14:44:37,932 INFO  
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - 
> Initializing BoundedBlockingResultPartitions
> 2021-01-29 14:44:37,932 INFO  
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - 
> Start to create 8000 FILE BoundedBlockingSubpartitions.
> 2021-01-29 14:44:37,932 [flink-akka.actor.default-dispatcher-5] INFO  
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition [] 
> - FileChannel #0 created.
> ... ...
> 2021-01-29 14:45:06,052 INFO  
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition [] 
> - FileChannel #7999 created.
> 2021-01-29 14:45:06,052 INFO  
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory [] - 
> Finish creating 8000 FILE BoundedBlockingSubpartitions.
> 2021-01-29 14:45:06,052 INFO  
> org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Finish 
> creating 1 result partition(s).
> 2021-01-29 14:45:06,052 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Finish 
> initializing task #898 (c9aefd1d30c2b133ba04ad495cd894fd) instance.
> 2021-01-29 14:45:06,052 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> task Source: Source 0 (898/8000)#0 (c9aefd1d30c2b133ba04ad495cd894fd), deploy 
> into slot with allocation id 38cafd5b456cc8ff873bbe18e4bf708a.
> 2021-01-29 14:45:06,053 INFO  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: Source 0 (898/8000)#0 
> (c9aefd1d30c2b133ba04ad495cd894fd) switched from CREATED to DEPLOYING.
> {code}
> We can see that it takes nearly 29 seconds to create 8k 
> BoundedBlockingSubpartitions, and this procedure blocks the main thread in 
> the TaskExecutor.
> The log of JobManager and TaskManager is attached below. The most typical 
> task is Source 0: #898.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to