If this is about too many timers and your application allows it, you may also try to reduce the timer resolution and thus frequency by coalescing them [1].
Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing On 11/07/18 18:27, Stephan Ewen wrote: > Hi shishal! > > I think there is an issue with cancellation when many timers fire at the > same time. These timers have to finish before shutdown happens, this > seems to take a while in your case. > > Did the TM process actually kill itself in the end (and got restarted)? > > > > On Wed, Jul 11, 2018 at 9:29 AM, shishal <shisha...@gmail.com > <mailto:shisha...@gmail.com>> wrote: > > Hi, > > I am using flink 1.4.2 with rocksdb as backend. I am using process > function > with timer on EventTime. For checkpointing I am using hdfs. > > I am trying load testing so Iam reading kafka from beginning (aprox > 7 days > data with 50M events). > > My job gets stuck after aprox 20 min with no error. There after > watermark do > not progress and all checkpoint fails. > > Also When I try to cancel my job (using web UI) , it takes several > minutes > to finally gets cancelled. Also it makes Task manager down as well. > > There is no logs while my job hanged but while cancelling I get > following > error. > > / > > 2018-07-11 09:10:39,385 ERROR > org.apache.flink.runtime.taskmanager.TaskManager - > ============================================================== > ====================== FATAL ======================= > ============================================================== > > A fatal error occurred, forcing the TaskManager to shut down: Task > 'process > (3/6)' did not react to cancelling signal in the last 30 seconds, but is > stuck in method: > org.rocksdb.RocksDB.get(Native Method) > org.rocksdb.RocksDB.get(RocksDB.java:810) > > org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) > > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > > nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) > > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > java.lang.Thread.run(Thread.java:748) > > 2018-07-11 09:10:39,390 DEBUG > > org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy > > - Actor was killed. Stopping it now. > akka.actor.ActorKilledException: Kill > 2018-07-11 09:10:39,407 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Stopping > TaskManager akka://flink/user/taskmanager#-1231617791. > 2018-07-11 09:10:39,408 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Cancelling > all computations and discarding all cached data. > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally process (3/6) > (432fd129f3eea363334521f8c8de5198). > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Task process (3/6) is already in state CANCELING > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally process (4/6) > (7c6b96c9f32b067bdf8fa7c283eca2e0). > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Task process (4/6) is already in state CANCELING > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally process (2/6) > (a4f731797a7ea210fd0b512b0263bcd9). > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Task process (2/6) is already in state CANCELING > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally process (1/6) > (cd8a113779a4c00a051d78ad63bc7963). > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.Task > - Task process (1/6) is already in state CANCELING > 2018-07-11 09:10:39,409 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Disassociating from JobManager > 2018-07-11 09:10:39,412 INFO > org.apache.flink.runtime.blob.PermanentBlobCache - Shutting > down BLOB cache > 2018-07-11 09:10:39,431 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting > down BLOB cache > 2018-07-11 09:10:39,444 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService > - > Stopping ZooKeeperLeaderRetrievalService. > 2018-07-11 09:10:39,444 DEBUG > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager > - Shutting > down I/O manager. > 2018-07-11 09:10:39,451 INFO > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager > - I/O manager > removed spill file directory > /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4 > 2018-07-11 09:10:39,461 INFO > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.NetworkEnvironment > - Shutting > down the network environment and its components. > 2018-07-11 09:10:39,461 DEBUG > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.NetworkEnvironment > - Shutting > down network connection manager > 2018-07-11 09:10:39,462 INFO > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.netty.NettyClient > - Successful > shutdown (took 1 ms). > 2018-07-11 09:10:39,472 INFO > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.netty.NettyServer > - Successful > shutdown (took 10 ms). > 2018-07-11 09:10:39,472 DEBUG > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.NetworkEnvironment > - Shutting > down intermediate result partition manager > 2018-07-11 09:10:39,473 DEBUG > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager > - > Releasing 0 partitions because of shutdown. > 2018-07-11 09:10:39,474 DEBUG > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager > - > Successful shutdown. > 2018-07-11 09:10:39,498 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Task > manager > akka://flink/user/taskmanager is completely shut down. > 2018-07-11 09:10:39,504 ERROR > org.apache.flink.runtime.taskmanager.TaskManager - Actor > akka://flink/user/taskmanager#-1231617791 terminated, stopping > process... > 2018-07-11 09:10:39,563 INFO > org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task 'process (2/6)' did not > react to cancelling signal in the last 30 seconds, but is stuck in > method: > org.rocksdb.RocksDB.get(Native Method) > org.rocksdb.RocksDB.get(RocksDB.java:810) > > org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) > > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > > nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) > > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > java.lang.Thread.run(Thread.java:748) > . > 2018-07-11 09:10:39,575 INFO > org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task 'process (1/6)' did not > react to cancelling signal in the last 30 seconds, but is stuck in > method: > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > java.lang.Class.newInstance(Class.java:442) > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196) > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399) > > org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304) > > org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104) > > org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) > > nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) > > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > org.apache.flink.streaming.runtime.io > > <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > java.lang.Thread.run(Thread.java:748) > / > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> > > -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
signature.asc
Description: OpenPGP digital signature