Ok, given the info that you are using ListState (which uses RocksDB’s merge() internally) this is probably a case of this problem: https://github.com/facebook/rocksdb/issues/1988 <https://github.com/facebook/rocksdb/issues/1988>
We provide a custom version of RocksDB with Flink 1.2.1 (where we fixed the slow merge operations) until we can upgrade to a newer version of RocksDB. So updating to 1.2.1 should fix the slowdown you observe. > Am 03.05.2017 um 19:10 schrieb Jason Brelloch <jb.bc....@gmail.com>: > > So looking through the logs I found these lines (repeated same test again > with a rocksDB backend, took 5m55s): > > 2017-05-03 12:52:24,131 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 2 @ 1493830344131 > 2017-05-03 12:52:24,132 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@56ff02da for Async calls > on Source: CIC Json Event Source -> Map -> Filter (1/1) > 2017-05-03 12:52:24,132 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@116cba7c for Async calls > on Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1) > 2017-05-03 12:52:24,134 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Asynchronous RocksDB snapshot (File Stream Factory @ > file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, > synchronous part) in thread Thread[Sequence Function -> Sink: Unnamed > (1/1),5,Flink Task Threads] took 0 ms. > 2017-05-03 12:52:24,142 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async calls on > Source: CIC Json Event Source -> Map -> Filter (1/1) > 2017-05-03 12:52:24,142 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async calls on > Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1) > 2017-05-03 12:58:19,167 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Asynchronous RocksDB snapshot (File Stream Factory @ > file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, > asynchronous part) in thread Thread[pool-4-thread-2,5,Flink Task Threads] > took 355032 ms. > 2017-05-03 12:58:19,170 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 2 (27543402 bytes in 355037 ms). > > Which I think means the async save to rocksDB took up most of time, and not > the serializing. We had some serialization slowdown when we were using an > ever growing ValueState object, but switching to a ListState seems to have > resolved that, so I am not sure that that is the issue. > > On Wed, May 3, 2017 at 12:05 PM, Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Sorry, just saw that your question was actually mainly about checkpointing, > but it can still be related to my previous answer. I assume the checkpointing > time is the time that is reported in the web interface? This would be the > end-to-end runtime of the checkpoint which does not really tell you how much > time is spend on writing the state itself, but you can find this exact detail > in the logging; you can grep for lines that start with "Asynchronous RocksDB > snapshot“. The background is that end-to-end also includes the time the > checkpoint barrier needs to travel to the operator. If there is a lot of > backpressure and a lot of network buffers, this can take a while. Still, the > reason for the backpressure could still be in the way you access RocksDB, as > it seems you are de/serializing every time you update an ever-growing value > under a single key. I can see that accesses under this conditions could > become very slow eventually, but could remain fast on the FSBackend for the > reason from my first answer. > >> Am 03.05.2017 um 17:54 schrieb Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>>: >> >> Hi, >> >> typically, I would expect that the bottleneck with the RocksDB backend is >> not RocksDB itself, but your TypeSerializers. I suggest to first run a >> profiler/sampling attached to the process and check if the problematic >> methods are in serialization or the actual accesses to RocksDB. The RocksDB >> backend has to go through de/serialize roundtrips on every single state >> access, while the FSBackend works on heap objects immediately. For >> checkpoints, the RocksDB backend can write bytes directly whereas the >> FSBackend has to use the serializers to get from objects to bytes, so their >> actions w.r.t. how serializers are used are kind of inverted between >> operation and checkpointing. For Flink 1.3 we also will introduce >> incremental checkpoints on RocksDB that piggyback on the SST files. Flink >> 1.2 is writing checkpoints and savepoints fully and in a custom format. >> >> Best, >> Stefan >> >>> Am 03.05.2017 um 16:46 schrieb Jason Brelloch <jb.bc....@gmail.com >>> <mailto:jb.bc....@gmail.com>>: >>> >>> Hey all, >>> >>> I am looking for some advice on tuning rocksDB for better performance in >>> Flink 1.2. I created a pretty simple job with a single kafka source and >>> one flatmap function that just stores 50000 events in a single key of >>> managed keyed state and then drops everything else, to test checkpoint >>> performance. Using a basic FsStateBackend configured as: >>> >>> val backend = new FsStateBackend("file:///home/jason/flink/checkpoint <>") >>> env.setStateBackend(backend) >>> >>> With about 30MB of state we see the checkpoints completing in 151ms. Using >>> a RocksDBStateBackend configured as: >>> >>> val backend = new RocksDBStateBackend("file:///home/jason/flink/checkpoint >>> <>") >>> backend.setDbStoragePath("file:///home/jason/flink/rocksdb <>") >>> backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) >>> env.setStateBackend(backend) >>> >>> Running the same test the checkpoint takes 3 minutes 42 seconds. >>> >>> I expect it to be slower, but that seems excessive. I am also a little >>> confused as to when rocksDB and flink decide to write to disk, because >>> watching the database the .sst file wasn't created until significantly >>> after the checkpoint was completed, and the state had not changed. Is >>> there anything I can do to increase the speed of the checkpoints, or >>> anywhere I can look to debug the issue? (Nothing seems out of the ordinary >>> in the flink logs or rocksDB logs) >>> >>> Thanks! >>> >>> -- >>> Jason Brelloch | Product Developer >>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>> <http://www.bettercloud.com/> >>> Subscribe to the BetterCloud Monitor >>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>> - Get IT delivered to your inbox >> > > > > > -- > Jason Brelloch | Product Developer > 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> > - Get IT delivered to your inbox