Hi, there is currently no workaround for this limitation if your operator uses timers, but it is pretty high on our TODO list for release 1.6.
Best, Stefan > Am 31.01.2018 um 09:29 schrieb Sofer, Tovi <tovi.so...@citi.com>: > > Hi Stefan, > > Thank you for the answer. > So you mean that any window use in the stream will result in synchronous > snapshotting? > When are you planning to fix this? > And is there a workaround? > > Thanks again, > Tovi > From: Stefan Richter [mailto:s.rich...@data-artisans.com] > Sent: יום ג 30 ינואר 2018 21:10 > To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com> > Cc: user@flink.apache.org > Subject: Re: Sync and Async checkpoint time > > Hi, > > this looks like the timer service is the culprit for this problem. Timers are > currently not stored in the state backend, but in a separate on-heap data > structure that does not support copy-on-write or async snapshots in general. > Therefore, writing the timers for a snapshot is always synchronous and this > explanation would also match your observation that the problem mainly affects > window operators, which make heavy use of timers. > > Best, > Stefan > > > Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <tovi.so...@citi.com > <mailto:tovi.so...@citi.com>>: > > Hi group, > > In our project we are using asynchronous FSStateBackend, and we are trying > to move to distributed storage – currently S3. > When using this storage we are experiencing issues of high backpressure and > high latency, in comparison of local storage. > We are trying to understand the reason, since the checkpoint is asynchronous, > so it shouldn’t have such high effect. > > We looked at checkpoint history in web, and details from log. > · From web it seems that Sync checkpoint duration is much higher then > Async duration. (again, this is only when using s3, not when using local > storage) > This happens especially in window operators (tumbling windows) such as below. > · But from log Sync time seems very short… > > Do you have any estimation why the async write to FSStateBackend has such > high effect on the stream performance? > > Checkpoint config: > env.enableCheckpointing(60000); > env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000); > > > · Checkpoint info from console: > <image004.png> > > · Checkpoint info from log: > 2018-01-30 07:33:36,416 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend - > [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory > @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] > took 12139 ms. > 2018-01-30 07:33:36,418 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-83] Received acknowledge message for > checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:36,676 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - > [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] > took 12396 ms. > 2018-01-30 07:33:36,677 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-79] Received acknowledge message for > checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:37,347 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - > [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] > took 13067 ms. > 2018-01-30 07:33:37,349 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-79] Received acknowledge message for > checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:37,418 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend - > [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory > @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] > took 13143 ms. > 2018-01-30 07:33:37,420 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-79] Received acknowledge message for > checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:37,508 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend - > [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory > @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] > took 13234 ms. > 2018-01-30 07:33:37,509 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-79] Received acknowledge message for > checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:37,589 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - > [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink > Task Threads] took 1 ms. > 2018-01-30 07:33:37,678 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend - > [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory > @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] > took 13403 ms. > 2018-01-30 07:33:37,680 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > [flink-akka.actor.default-dispatcher-79] Received acknowledge message for > checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job > 747c4cef2841d2ab090d9ed97e0357cc. > 2018-01-30 07:33:38,143 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - > [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ > s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc > > <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, > asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] > took 13863 ms. > > Thanks & regards, > > Tovi >