Hi,

there is currently no workaround for this limitation if your operator uses 
timers, but it is pretty high on our TODO list for release 1.6.

Best,
Stefan

> Am 31.01.2018 um 09:29 schrieb Sofer, Tovi <tovi.so...@citi.com>:
> 
> Hi Stefan,
>  
> Thank you for the answer.
> So you mean that any window use in the stream will result in synchronous 
> snapshotting?
> When are you planning to fix this?  
> And is there a workaround?
>  
> Thanks again,
> Tovi
> From: Stefan Richter [mailto:s.rich...@data-artisans.com] 
> Sent: יום ג 30 ינואר 2018 21:10
> To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
> Cc: user@flink.apache.org
> Subject: Re: Sync and Async checkpoint time
>  
> Hi,
>  
> this looks like the timer service is the culprit for this problem. Timers are 
> currently not stored in the state backend, but in a separate on-heap data 
> structure that does not support copy-on-write or async snapshots in general. 
> Therefore, writing the timers for a snapshot is always synchronous and this 
> explanation would also match your observation that the problem mainly affects 
> window operators, which make heavy use of timers.
>  
> Best,
> Stefan
> 
> 
> Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <tovi.so...@citi.com 
> <mailto:tovi.so...@citi.com>>:
>  
> Hi group,
>  
> In our project we are using asynchronous  FSStateBackend, and we are trying 
> to move to distributed storage – currently S3.
> When using this storage we are experiencing issues of high backpressure and 
> high latency, in comparison of local storage.
> We are trying to understand the reason, since the checkpoint is asynchronous, 
> so it shouldn’t have such high effect.
>  
> We looked at checkpoint history in web, and details from log.
> ·         From web it seems that Sync checkpoint duration is much higher then 
> Async duration. (again, this is only when using s3, not when using local 
> storage)
> This happens especially in window operators (tumbling windows) such as below.
> ·         But from log Sync time seems very short…
>  
> Do you have any estimation why the async write to FSStateBackend has such 
> high effect on the stream performance?
>  
> Checkpoint config:
> env.enableCheckpointing(60000);
> env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
>  
>  
> ·         Checkpoint info from console:
> <image004.png>
>  
> ·         Checkpoint info from log:
> 2018-01-30 07:33:36,416 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] 
> took 12139 ms.
> 2018-01-30 07:33:36,418 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
> checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:36,676 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] 
> took 12396 ms.
> 2018-01-30 07:33:36,677 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,347 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] 
> took 13067 ms.
> 2018-01-30 07:33:37,349 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,418 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] 
> took 13143 ms.
> 2018-01-30 07:33:37,420 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,508 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] 
> took 13234 ms.
> 2018-01-30 07:33:37,509 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,589 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink 
> Task Threads] took 1 ms.
> 2018-01-30 07:33:37,678 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory 
> @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] 
> took 13403 ms.
> 2018-01-30 07:33:37,680 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> [flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
> checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 
> 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:38,143 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
> [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ 
> s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc
>  
> <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>,
>  asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] 
> took 13863 ms.
>  
> Thanks & regards,
> 
> Tovi
> 

Reply via email to