Hi, I wonder what reason you might have that you ever want such a huge number > of retained checkpoints?
The Flink jobs running on EMR cluster require a checkpoint at midnight. (In our use case we need to synch a loaded delta to our a third party partner with the streamed data). The delta load the whole day data and that's why we wanted to have available the midnight's checkpoint to start from there. We could also make a savepoint at midnight, but it’s not as handy (we would need to build our own tooling to do it), and it can’t benefit from the smaller latency of an incremental checkpoint. Another thining is that implementing our own savepoint tool is a bit hard to monitor. Besides, retaining several having checkpoints created every minute is that it would also allow us to load a delta at any time. Please, if there are better ways of achieving this, let me know. >From where does the log trace come from? It comes from the TaskManager. Please search on the opposite side of the time outing connection for > possible root cause of the timeout including: > - possible error/exceptions/warnings > - long GC pauses or other blocking operations (possibly long unnatural > gaps in the logs) > - machine health (CPU usage, disks usage, network connections) It seems that TaskManager disconnect from JobManager and then cannot reach it again and I cannot tell the reason. I think machine health metrics mentioned above seems to be OK. Would you say *Direct memory stats *usage is correct? What is the way to check the GC pauses? Those are some traces from the TaskManager log, before/after it detached from JobManager 2018-01-08 22:26:37,263 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36] 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)] 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815 2018-01-08 22:26:42,263 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB (used/committed/max)] 2018-01-08 22:26:42,264 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36] 2018-01-08 22:26:42,999 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp:// fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341] 2018-01-08 22:26:43,034 INFO org.apache.flink.yarn.YarnTaskManager - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp:// fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no longer reachable 2018-01-08 22:26:43,035 INFO org.apache.flink.yarn.YarnTaskManager - Cancelling all computations and discarding all cached data. 2018-01-08 22:26:43,037 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d). 2018-01-08 22:26:43,037 INFO org.apache.flink.runtime.taskmanager.Task - Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED. java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp:// fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no longer reachable at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2018-01-08 22:26:43,069 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d). 2018-01-08 22:26:43,087 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: CounterSink (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e). *José Miguel Tejedor Fernández* Server developer jose.fernan...@rovio.com Rovio Entertainment Ltd. Keilaranta 7, FIN - 02150 Espoo, Finland www.rovio.com On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter < s.rich...@data-artisans.com> wrote: > Hi, > > there is no known limitation in the strict sense, but you might run out of > dfs space or job manager memory if you keep around a huge number > checkpoints. I wonder what reason you might have that you ever want such a > huge number of retained checkpoints? Usually keeping one checkpoint should > do the job, maybe a couple more if you are very afraid about corruption > that goes beyond your DFSs capabilities to handle it. Is there any reason > for that or maybe a misconception about increasing the number of retained > checkpoints is good for? > > Best, > Stefan > > > Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>: > > Hi, > > Increasing akka’s timeouts is rarely a solution for any problems - it > either do not help, or just mask the issue making it less visible. But yes, > it is possible to bump the limits: https://ci.apache.org/ > projects/flink/flink-docs-release-1.3/setup/config.html# > distributed-coordination-via-akka > > I don’t think that state.checkpoints.num-retained was thought to handle > such large numbers of retained checkpoint so maybe there are some > known/unknown limitations. Stefan, do you know something in this regard? > > Parallel thing to do is that like for any other akka timeout, you should > track down the root cause of it. This one warning line doesn’t tell much. > From where does it come from? Client log? Job manager log? Task manager > log? Please search on the opposite side of the time outing connection for > possible root cause of the timeout including: > - possible error/exceptions/warnings > - long GC pauses or other blocking operations (possibly long unnatural > gaps in the logs) > - machine health (CPU usage, disks usage, network connections) > > Piotrek > > On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez < > jose.fernan...@rovio.com> wrote: > > Hello, > > I have several stream jobs running (v. 1.3.1 ) in production which always > fails after a fixed period of around 30h after being executing. That's the > WARN trace before failing: > > Association with remote system > [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, > address is now gated for [5000] ms. Reason: [Association failed with > [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: > [No response from remote for outbound association. Handshake timed out after > [20000 ms]. > > > The main change done in the job configuration was to increase the > state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous > RocksDB to persists to snapshot the state. (I attach some screenshots with > the checkpoint conf from webUI) > > > - May my assumption be correct that the increase of > checkpoints.num-retained is causing the problem? Any known issue regarding > this? > > > - Besides, Is there any way to increase the Akka handshake timeout > from the current 20000 ms to a higher value? I considered that it may be > convenient to increase the timeout to 1 minute instead. > > > BR > > > <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at > 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png> > > > >