Hi Ivan, It sounds to me like a bug in FlinkKinesisConsumer that it's not cancelling properly. The change in the behaviour could have been introduced as a bug fix [1], where we had to stop interrupting the source thread. This also might be related or at least relevant for fixing [2].
Ivan, the stack trace that you posted shows only that the task thread is waiting for the source thread to finish. It doesn't show why the source thread hasn't ended. For someone to fix this, it would be helpful if you could provide a thread dump from a Task Manager that is stuck in cancelling state. If for some reason you don't want to share a full thread dump, you can share only the threads that have on the stack trace any package that contains "kinesis" (this would capture the `FlinkKinesisConsumer` and all internal kinesis threads). This should be enough for someone that is familiar with FlinkKinesisConsumer to understand why it hasn't been canceled. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-21028 [2] https://issues.apache.org/jira/browse/FLINK-23528 czw., 29 lip 2021 o 04:15 Yangze Guo <karma...@gmail.com> napisał(a): > In your case, the entry point is the `cleanUpInvoke` function called > by `StreamTask#invoke`. > > @ro...@apache.org Could you take another look at this? > > Best, > Yangze Guo > > On Thu, Jul 29, 2021 at 2:29 AM Ivan Yang <ivanygy...@gmail.com> wrote: > > > > Hi Yangze, > > > > I deployed 1.13.1, same problem exists. It seems like that the cancel > logic has changed since 1.11.0 (which was the one we have been running for > almost 1 year). In 1.11.0, during the cancellation, we saw some subtask > stays in the cancelling state for sometime, but eventually the job will be > cancelled, and no task manager were lost. So we can start the job right > away. In the new version 1.13.x, it will kill the task managers where those > stuck sub tasks were running on, then takes another 4-5 minutes for the > task manager to rejoin. Can you point me the code that manages the job > cancellation routine? Want to understand the logic there. > > > > Thanks, > > Ivan > > > > > On Jul 26, 2021, at 7:22 PM, Yangze Guo <karma...@gmail.com> wrote: > > > > > > Hi, Ivan > > > > > > My gut feeling is that it is related to FLINK-22535. Could @Yun Gao > > > take another look? If that is the case, you can upgrade to 1.13.1. > > > > > > Best, > > > Yangze Guo > > > > > > On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang <ivanygy...@gmail.com> > wrote: > > >> > > >> Dear Flink experts, > > >> > > >> We recently ran into an issue during a job cancellation after > upgraded to 1.13. After we issue a cancel (from Flink console or flink > cancel {jobid}), a few subtasks stuck in cancelling state. Once it gets to > that situation, the behavior is consistent. Those “cancelling tasks will > never become canceled. After 3 minutes, The job stopped, as a result, > number of task manages were lost. It will take about another 5 minute for > the those lost task manager to rejoin the Job manager. Then we can restart > the job from the previous checkpoint. Found an exception from the hanging > (cancelling) Task Manager. > > >> ========================================== > > >> sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705) > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637) > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > java.lang.Thread.run(Thread.java:748) > > >> =========================================== > > >> > > >> Here are some background information about our job and setup. > > >> 1) The job is relatively large, we have 500+ parallelism and 2000+ > subtasks. It’s mainly reading from a Kinesis stream and perform some > transformation and fanout to multiple output s3 buckets. It’s a stateless > ETL job. > > >> 2) The same code and setup running on smaller environments don’t seem > to have this cancel failure problem. > > >> 3) We have been using Flink 1.11.0 for the same job, and never seen > this cancel failure and killing Task Manager problem. > > >> 4) With upgrading to 1.13, we also added Kubernetes HA > (zookeeperless). Pervious we don’t not use HA. > > >> > > >> The cancel and restart from previous checkpoint is our regular > procedure to support daily operation. With this 10 minutes TM restart > cycle, it basically slowed down our throughput. I try to understand what > leads into this situation. Hoping maybe some configuration change will > smooth things out. Also any suggestion to shorten the waiting. It appears > to be some timeout on the TaskManager and JobManager can be adjusted to > speed it up. But really want to avoid stuck in cancellation if we can. > > >> > > >> Thanks you, hoping to get some insight knowledge here. > > >> > > >> Ivan > > >