Hi, adding to what has already been said, I think that here can be two orthogonal problems here: i) why is your job slowing down/getting stuck? and ii) why is cancellation blocked? As for ii) I think Stephan already gave to right reason that shutdown could take longer and that is what gets the TM killed.
A more interesting question could still be i), why is your job slowing down until shutdown in the first place. I have two questions here.First, are you running on RocksDB on EBS volumes, then please have a look at this thread [1] because there can be some performance pitfalls. Second, how many timers are you expecting, and how are they firing? For example, if you have a huge amount of timers and the watermark makes a bug jump, there is a possibility that it takes a while until the job makes progress because it has to handle so many timer callbacks first. Metrics from even throughput and from your I/O subsystem could be helpful to see if something is stuck/underperforming or if there is just a lot of timer processing going on. Best, Stefan [1] https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E <https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E> > Am 11.07.2018 um 19:31 schrieb Nico Kruber <n...@data-artisans.com>: > > If this is about too many timers and your application allows it, you may > also try to reduce the timer resolution and thus frequency by coalescing > them [1]. > > > Nico > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing > > On 11/07/18 18:27, Stephan Ewen wrote: >> Hi shishal! >> >> I think there is an issue with cancellation when many timers fire at the >> same time. These timers have to finish before shutdown happens, this >> seems to take a while in your case. >> >> Did the TM process actually kill itself in the end (and got restarted)? >> >> >> >> On Wed, Jul 11, 2018 at 9:29 AM, shishal <shisha...@gmail.com >> <mailto:shisha...@gmail.com>> wrote: >> >> Hi, >> >> I am using flink 1.4.2 with rocksdb as backend. I am using process >> function >> with timer on EventTime. For checkpointing I am using hdfs. >> >> I am trying load testing so Iam reading kafka from beginning (aprox >> 7 days >> data with 50M events). >> >> My job gets stuck after aprox 20 min with no error. There after >> watermark do >> not progress and all checkpoint fails. >> >> Also When I try to cancel my job (using web UI) , it takes several >> minutes >> to finally gets cancelled. Also it makes Task manager down as well. >> >> There is no logs while my job hanged but while cancelling I get >> following >> error. >> >> / >> >> 2018-07-11 09:10:39,385 ERROR >> org.apache.flink.runtime.taskmanager.TaskManager - >> ============================================================== >> ====================== FATAL ======================= >> ============================================================== >> >> A fatal error occurred, forcing the TaskManager to shut down: Task >> 'process >> (3/6)' did not react to cancelling signal in the last 30 seconds, but is >> stuck in method: >> org.rocksdb.RocksDB.get(Native Method) >> org.rocksdb.RocksDB.get(RocksDB.java:810) >> >> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) >> >> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >> >> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >> >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >> >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >> >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >> >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> java.lang.Thread.run(Thread.java:748) >> >> 2018-07-11 09:10:39,390 DEBUG >> >> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy >> >> - Actor was killed. Stopping it now. >> akka.actor.ActorKilledException: Kill >> 2018-07-11 09:10:39,407 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Stopping >> TaskManager akka://flink/user/taskmanager#-1231617791. >> 2018-07-11 09:10:39,408 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - >> Cancelling >> all computations and discarding all cached data. >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Attempting to fail task externally process (3/6) >> (432fd129f3eea363334521f8c8de5198). >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Task process (3/6) is already in state CANCELING >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Attempting to fail task externally process (4/6) >> (7c6b96c9f32b067bdf8fa7c283eca2e0). >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Task process (4/6) is already in state CANCELING >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Attempting to fail task externally process (2/6) >> (a4f731797a7ea210fd0b512b0263bcd9). >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Task process (2/6) is already in state CANCELING >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Attempting to fail task externally process (1/6) >> (cd8a113779a4c00a051d78ad63bc7963). >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Task process (1/6) is already in state CANCELING >> 2018-07-11 09:10:39,409 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - >> Disassociating from JobManager >> 2018-07-11 09:10:39,412 INFO >> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting >> down BLOB cache >> 2018-07-11 09:10:39,431 INFO >> org.apache.flink.runtime.blob.TransientBlobCache - Shutting >> down BLOB cache >> 2018-07-11 09:10:39,444 INFO >> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >> - >> Stopping ZooKeeperLeaderRetrievalService. >> 2018-07-11 09:10:39,444 DEBUG >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager >> - Shutting >> down I/O manager. >> 2018-07-11 09:10:39,451 INFO >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager >> - I/O manager >> removed spill file directory >> /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4 >> 2018-07-11 09:10:39,461 INFO >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.network.NetworkEnvironment >> - Shutting >> down the network environment and its components. >> 2018-07-11 09:10:39,461 DEBUG >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.network.NetworkEnvironment >> - Shutting >> down network connection manager >> 2018-07-11 09:10:39,462 INFO >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.network.netty.NettyClient >> - Successful >> shutdown (took 1 ms). >> 2018-07-11 09:10:39,472 INFO >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.network.netty.NettyServer >> - Successful >> shutdown (took 10 ms). >> 2018-07-11 09:10:39,472 DEBUG >> org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io>.network.NetworkEnvironment >> - Shutting >> down intermediate result partition manager >> 2018-07-11 09:10:39,473 DEBUG >> org.apache.flink.runtime.io >> >> <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager >> >> - >> Releasing 0 partitions because of shutdown. >> 2018-07-11 09:10:39,474 DEBUG >> org.apache.flink.runtime.io >> >> <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager >> >> - >> Successful shutdown. >> 2018-07-11 09:10:39,498 INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Task >> manager >> akka://flink/user/taskmanager is completely shut down. >> 2018-07-11 09:10:39,504 ERROR >> org.apache.flink.runtime.taskmanager.TaskManager - Actor >> akka://flink/user/taskmanager#-1231617791 terminated, stopping >> process... >> 2018-07-11 09:10:39,563 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Notifying TaskManager about fatal error. Task 'process (2/6)' did not >> react to cancelling signal in the last 30 seconds, but is stuck in >> method: >> org.rocksdb.RocksDB.get(Native Method) >> org.rocksdb.RocksDB.get(RocksDB.java:810) >> >> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) >> >> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >> >> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >> >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >> >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >> >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >> >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> java.lang.Thread.run(Thread.java:748) >> . >> 2018-07-11 09:10:39,575 INFO >> org.apache.flink.runtime.taskmanager.Task >> - Notifying TaskManager about fatal error. Task 'process (1/6)' did not >> react to cancelling signal in the last 30 seconds, but is stuck in >> method: >> >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> java.lang.Class.newInstance(Class.java:442) >> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196) >> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399) >> >> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304) >> >> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104) >> >> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >> >> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >> >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >> >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >> >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >> >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >> org.apache.flink.streaming.runtime.io >> >> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >> >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> java.lang.Thread.run(Thread.java:748) >> / >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> >> > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >