Hi Yangze,

I deployed 1.13.1, same problem exists. It seems like that the cancel logic has 
changed since 1.11.0 (which was the one we have been running for almost 1 
year). In 1.11.0, during the cancellation, we saw some subtask stays in the 
cancelling state for sometime, but eventually the job will be cancelled, and no 
task manager were lost. So we can start the job right away. In the new version 
1.13.x, it will kill the task managers where those stuck sub tasks were running 
on, then takes another 4-5 minutes for the task manager to rejoin.  Can you 
point me the code that manages the job cancellation routine? Want to understand 
the logic there.

Thanks,
Ivan

> On Jul 26, 2021, at 7:22 PM, Yangze Guo <karma...@gmail.com> wrote:
> 
> Hi, Ivan
> 
> My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> take another look? If that is the case, you can upgrade to 1.13.1.
> 
> Best,
> Yangze Guo
> 
> On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang <ivanygy...@gmail.com> wrote:
>> 
>> Dear Flink experts,
>> 
>> We recently ran into an issue during a job cancellation after upgraded to 
>> 1.13. After we issue a cancel (from Flink console or flink cancel {jobid}), 
>> a few subtasks stuck in cancelling state. Once it gets to that situation, 
>> the behavior is consistent. Those “cancelling tasks will never become 
>> canceled. After 3 minutes, The job stopped, as a result, number of task 
>> manages were lost. It will take about another 5 minute for the those lost 
>> task manager to rejoin the Job manager. Then we can restart the job from the 
>> previous checkpoint. Found an exception from the hanging (cancelling) Task 
>> Manager.
>> ==========================================
>>        sun.misc.Unsafe.park(Native Method) 
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>>  
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>>  
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
>> java.lang.Thread.run(Thread.java:748)
>> ===========================================
>> 
>> Here are some background information about our job and setup.
>> 1) The job is relatively large, we have 500+ parallelism and 2000+ subtasks. 
>> It’s mainly reading from a Kinesis stream and perform some transformation 
>> and fanout to multiple output s3 buckets. It’s a stateless ETL job.
>> 2) The same code and setup running on smaller environments don’t seem to 
>> have this cancel failure problem.
>> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
>> cancel failure and killing Task Manager problem.
>> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
>> Pervious we don’t not use HA.
>> 
>> The cancel and restart from previous checkpoint is our regular procedure to 
>> support daily operation. With this 10 minutes TM restart cycle, it basically 
>> slowed down our throughput. I try to understand what leads into this 
>> situation. Hoping maybe some configuration change will smooth things out. 
>> Also any suggestion to shorten the waiting. It appears to be some timeout on 
>> the TaskManager and JobManager can be  adjusted to speed it up. But really 
>> want to avoid stuck in cancellation if we can.
>> 
>> Thanks you, hoping to get some insight knowledge here.
>> 
>> Ivan

Reply via email to