Hi,

I think there are a couple of potential explanations for the increased latency. 
Let me point out two of the most obvious that come to my mind:

1) A state size of 20 MB sounds like something that could (completely or to a 
large extend) fit into some cache layer of a modern CPU, whereas 200 MB does 
not. Please also notice that the reported size for checkpoints can be very 
different from the actual size in memory, typically smaller (compactly 
serialized bytes vs objects, references, … on the heap). Depending on your 
architecture, setup, access pattern, etc. this could mean that the hot path of 
your code is served from the cache in one case and has to access main memory in 
the other. You could test this hypothesis by running more data points between 
20 MB, 200 MB, and beyond. If you observe plateau areas in your latency 
measurements for many size ranges, with relatively sharp jumps between the 
plateaus, this could indicate operating within vs outside of some cache.

2) In asyc mode, the backend applies copy-on-write to track modifications that 
run concurrently with the checkpoint. If your checkpoints are bigger, the 
length of the phase in which copy-on-write has to be applied, as well as the 
total number of objects that can could be copied is bigger. In theory, this can 
introduce latency, but I don’t have numbers on this effect. However, this can 
easily be checked by deactivating checkpoints.

Another point is how you scale up your state? Do you introduce more state per 
key, or more keys without changing their state size, or a mix of both? There 
are obviously way more potential explanations, reaching from introduced skew to 
NUMA effects on your 40 core CPU. But the points above should be the most 
obvious candidates.

Best,
Stefan

> Am 29.10.2017 um 17:34 schrieb Sofer, Tovi <tovi.so...@citi.com>:
> 
> Hi all,
>  
> In our application we have a requirement to very low latency, preferably less 
> than 5ms.
> We were able to achieve this so far, but when we start increasing the state 
> size, we see distinctive decrease in latency.
> We have added MinPauseBetweenCheckpoints, and are using async snapshots.
> ·         Why does state size has such distinctive effect on latency? How can 
> this effect be minimized?
> ·         Can the state snapshot be done using separates threads and 
> resources in order to less effect on stream data handling?
>  
>  
> Details:
>  
> Application configuration:
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async 
> snapshots
> env.setParallelism (16) ; //running on machine with 40 cores
>  
> Results:
>  
> A.      When state size is ~20MB got latency of 0.3 ms latency for 99’th 
> percentile
>  
> Latency info: (in nanos)
> 2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter 
> - [Flink-MetricRegistry-1] 
> localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming 
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:31919 
> max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 
> p95:146654.0499999999 p98:204671.74 p99:308958.73999999993 
> p999:3844154.002999794
> State\checkpoint info:
>  
> <image001.png>
>  
>  
>  
> B.      When state size is ~200MB latency was significantly decreased to 9 ms 
> latency for 99’th percentile
> Latency info: 
> 2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter 
> - [Flink-MetricRegistry-1] 
> localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming 
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:10000 min:30186 
> max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 
> p75:85780.25 p95:219882.69999999914 p98:2360171.4399999934 
> p99:9251766.559999945 p999:3.956163987499886E7
> State\checkpoint info:
>  
>  
> <image002.png>
>  
> Thanks and regrdas,
> 
> Tovi
> 

Reply via email to