Ok, given the info that you are using ListState (which uses RocksDB’s merge() 
internally) this is probably a case of this problem: 
https://github.com/facebook/rocksdb/issues/1988 
<https://github.com/facebook/rocksdb/issues/1988>

We provide a custom version of RocksDB with Flink 1.2.1 (where we fixed the 
slow merge operations) until we can upgrade to a newer version of RocksDB. So 
updating to 1.2.1 should fix the slowdown you observe.

> Am 03.05.2017 um 19:10 schrieb Jason Brelloch <jb.bc....@gmail.com>:
> 
> So looking through the logs I found these lines (repeated same test again 
> with a rocksDB backend, took 5m55s):
> 
> 2017-05-03 12:52:24,131 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
> checkpoint 2 @ 1493830344131
> 2017-05-03 12:52:24,132 INFO  org.apache.flink.core.fs.FileSystem             
>               - Created new CloseableRegistry 
> org.apache.flink.core.fs.SafetyNetCloseableRegistry@56ff02da for Async calls 
> on Source: CIC Json Event Source -> Map -> Filter (1/1)
> 2017-05-03 12:52:24,132 INFO  org.apache.flink.core.fs.FileSystem             
>               - Created new CloseableRegistry 
> org.apache.flink.core.fs.SafetyNetCloseableRegistry@116cba7c for Async calls 
> on Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1)
> 2017-05-03 12:52:24,134 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Asynchronous RocksDB snapshot (File Stream Factory @ 
> file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, 
> synchronous part) in thread Thread[Sequence Function -> Sink: Unnamed 
> (1/1),5,Flink Task Threads] took 0 ms.
> 2017-05-03 12:52:24,142 INFO  org.apache.flink.core.fs.FileSystem             
>               - Ensuring all FileSystem streams are closed for Async calls on 
> Source: CIC Json Event Source -> Map -> Filter (1/1)
> 2017-05-03 12:52:24,142 INFO  org.apache.flink.core.fs.FileSystem             
>               - Ensuring all FileSystem streams are closed for Async calls on 
> Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1)
> 2017-05-03 12:58:19,167 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Asynchronous RocksDB snapshot (File Stream Factory @ 
> file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, 
> asynchronous part) in thread Thread[pool-4-thread-2,5,Flink Task Threads] 
> took 355032 ms.
> 2017-05-03 12:58:19,170 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
> checkpoint 2 (27543402 bytes in 355037 ms).
> 
> Which I think means the async save to rocksDB took up most of time, and not 
> the serializing.  We had some serialization slowdown when we were using an 
> ever growing ValueState object, but switching to a ListState seems to have 
> resolved that, so I am not sure that that is the issue.
> 
> On Wed, May 3, 2017 at 12:05 PM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Sorry, just saw that your question was actually mainly about checkpointing, 
> but it can still be related to my previous answer. I assume the checkpointing 
> time is the time that is reported in the web interface? This would be the 
> end-to-end runtime of the checkpoint which does not really tell you how much 
> time is spend on writing the state itself, but you can find this exact detail 
> in the logging; you can grep for lines that start with "Asynchronous RocksDB 
> snapshot“. The background is that end-to-end also includes the time the 
> checkpoint barrier needs to travel to the operator. If there is a lot of 
> backpressure and a lot of network buffers, this can take a while. Still, the 
> reason for the backpressure could still be in the way you access RocksDB, as 
> it seems you are de/serializing every time you update an ever-growing value 
> under a single key. I can see that accesses under this conditions could 
> become very slow eventually, but could remain fast on the FSBackend for the 
> reason from my first answer.
> 
>> Am 03.05.2017 um 17:54 schrieb Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> typically, I would expect that the bottleneck with the RocksDB backend is 
>> not RocksDB itself, but your TypeSerializers. I suggest to first run a 
>> profiler/sampling attached to the process and check if the problematic 
>> methods are in serialization or the actual accesses to RocksDB. The RocksDB 
>> backend has to go through de/serialize roundtrips on every single state 
>> access, while the FSBackend works on heap objects immediately. For 
>> checkpoints, the RocksDB backend can write bytes directly whereas the 
>> FSBackend has to use the serializers to get from objects to bytes, so their 
>> actions w.r.t. how serializers are used are kind of inverted between 
>> operation and checkpointing. For Flink 1.3 we also will introduce 
>> incremental checkpoints on RocksDB that piggyback on the SST files. Flink 
>> 1.2 is writing checkpoints and savepoints fully and in a custom format.
>> 
>> Best,
>> Stefan
>> 
>>> Am 03.05.2017 um 16:46 schrieb Jason Brelloch <jb.bc....@gmail.com 
>>> <mailto:jb.bc....@gmail.com>>:
>>> 
>>> Hey all,
>>> 
>>> I am looking for some advice on tuning rocksDB for better performance in 
>>> Flink 1.2.  I created a pretty simple job with a single kafka source and 
>>> one flatmap function that just stores 50000 events in a single key of 
>>> managed keyed state and then drops everything else, to test checkpoint 
>>> performance.  Using a basic FsStateBackend configured as:
>>> 
>>> val backend = new FsStateBackend("file:///home/jason/flink/checkpoint <>")
>>> env.setStateBackend(backend)
>>> 
>>> With about 30MB of state we see the checkpoints completing in 151ms.  Using 
>>> a RocksDBStateBackend configured as:
>>> 
>>> val backend = new RocksDBStateBackend("file:///home/jason/flink/checkpoint 
>>> <>")
>>> backend.setDbStoragePath("file:///home/jason/flink/rocksdb <>")
>>> backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>> env.setStateBackend(backend)
>>> 
>>> Running the same test the checkpoint takes 3 minutes 42 seconds.
>>> 
>>> I expect it to be slower, but that seems excessive.  I am also a little 
>>> confused as to when rocksDB and flink decide to write to disk, because 
>>> watching the database the .sst file wasn't created until significantly 
>>> after the checkpoint was completed, and the state had not changed.  Is 
>>> there anything I can do to increase the speed of the checkpoints, or 
>>> anywhere I can look to debug the issue?  (Nothing seems out of the ordinary 
>>> in the flink logs or rocksDB logs)
>>> 
>>> Thanks!
>>> 
>>> -- 
>>> Jason Brelloch | Product Developer
>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
>>>  <http://www.bettercloud.com/>
>>> Subscribe to the BetterCloud Monitor 
>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>>  - Get IT delivered to your inbox
>> 
> 
> 
> 
> 
> -- 
> Jason Brelloch | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
>  <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor 
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>  - Get IT delivered to your inbox

Reply via email to