Following up: I've put together the implementation, https://github.com/apache/flink/pull/12345. It's passing tests but is only partially complete, as it still needs some clean-up and configuration. I still need to try running this against a production cluster to check the performance, as well as getting some RocksDB benchmarks.
On Mon, May 18, 2020 at 3:46 PM Joey Pereira <j...@stripe.com> wrote: > Thanks Yun for highlighting this, it's very helpful! I'll give it a go > with that in mind. > > We have already begun using checkpoints for recovery. Having these > improvements would still be immensely helpful to reduce downtime for > savepoint recovery. > > On Mon, May 18, 2020 at 3:14 PM Yun Tang <myas...@live.com> wrote: > >> Hi Joey >> >> Previously, I also looked at the mechanism to create on-disk SSTables as >> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I >> found the main challenge is how to ensure the keys are inserted in a >> strictly increasing order. The key order in java could differ from the >> bytes order in RocksDB. In your case, I think it could be much easier as >> RocksFullSnapshotStrategy write data per columnfamily per key group which >> should be in a strictly increasing order [1]. >> >> FLINK-17288 <https://issues.apache.org/jira/browse/FLINK-17288> could >> mitigate the performance and your solution could help improve the >> performance much better (and could integrate with state-processor-api >> story). >> >> On the other hand, for out-of-box to use in production for your scenario, >> how about using checkpoint to recover, as it also supports rescale and >> normal recover. >> >> [1] >> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308 >> >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Joey Pereira <j...@stripe.com> >> *Sent:* Tuesday, May 19, 2020 2:27 >> *To:* user@flink.apache.org <user@flink.apache.org> >> *Cc:* Mike Mintz <mikemi...@stripe.com>; Shahid Chohan <cho...@stripe.com>; >> Aaron Levin <aaronle...@stripe.com> >> *Subject:* RocksDB savepoint recovery performance improvements >> >> Hey, >> >> While running a Flink application with a large-state, savepoint recovery >> has been a painful part of operating the application because recovery time >> can be several hours. During some profiling that chohan (cc'd) had done, a >> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and >> Put operations. >> >> When Flink is bootstrapping state for RocksDB instances this is not what >> I would have expected, as RocksDB supports direct ingestion of the on-disk >> format (SSTables): >> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. >> This >> was also recently reported on Jira: >> https://issues.apache.org/jira/browse/FLINK-17288. >> >> From what I understood of the current implementation: >> >> * The snapshot restoration pathways, RocksDBFullRestoreOperation and >> RocksDBIncrementalRestoreOperation, >> use RocksDBWriteBatchWrapper. >> >> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This >> will provide atomicity of batches as well as performance benefits for >> batching, compared to individual Puts, but it will still involve RocksDB’s >> insert paths which can involve expensive operations[0]. >> >> Instead, by creating SSTable files and instructing RocksDB to ingest the >> files, writes can be batched even further and avoid expensive operations in >> RocksDB. This is commonly utilized by other systems for restoration or >> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3]. There >> are some restrictions on being able to generate SSTables, as well as >> limitations for ingestion to be performant. Unfortunately, it’s all not >> very well documented: >> >> 1. When generating an SSTable, keys need to be inserted in-order. >> >> 2. Ingested files should not have key-ranges that overlap with either >> existing or other ingested files[4]. It is possible to ingest overlapping >> SSTables, but this may incur significant overhead. >> >> To generate SSTables with non-overlapping key-ranges and to create them >> with keys in-order, it would mean that the savepoints would need to be >> ordered while processing them. I'm unsure if this is the case for how >> Flink's savepoints are stored. >> >> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is >> used (eg: for incremental checkpoint or something else). I did notice it >> is iterating over a temporary RocksDB instance and inserting into a "final >> ” instance. These writes could be optimized in a similar manner. >> Alternatively, it could be possible to use the temporary instance's >> SSTables, ingest them, and prune data out with RocksDB's DeleteRange. >> >> To get started with prototyping, I was thinking of taking a simple >> approach of making an interface for RocksDBWriteBatchWrapper and swapping >> the implementation for one that does SSTable generation and ingestion. I >> reckon that will be an easy way to sanity check whether it works at all. >> >> I was also planning on writing some benchmarks in RocksDB to understand >> the difference for ingestion scenarios, as RocksDB itself is sparse on >> details about SSTable ingestion[4] and does not have benchmarking for >> ingestion. >> >> Does all of that seem sound? I'll report back when I get time to work out >> that implementation and tests, likely during the coming weekend. >> >> >> Joey >> >> --- >> >> [0]: I don’t have any specific sources on this. At a high-level, some of >> the operations happening during writes include writing to the memtable >> before flushing to an SSTable and doing merging and/or compaction. In >> general, these will add write-amplification and overall overhead to bulk >> insertion. These can largely be avoided by giving RocksDB SSTables, >> especially if they have non-overlapping key-ranges. "Characterizing, >> Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" ( >> https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful >> source that highlights what happens during various workloads. >> >> >> [1]: CockroachDB is a database that uses RocksDB as the on-disk storage. >> Their implementation consolidates bulk ingestion to an AddSSTable >> command. Primarily, they have some choice of options for SSTable generation >> and ingestion that are of interest: >> >> * SSTable generation: >> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966 >> >> * SSTable ingestion: >> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917 >> >> >> [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB >> as the on-disk storage. Their implementation of bulk ingestion is contained >> within: >> https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs >> >> Other useful references: >> - https://github.com/tikv/tikv/issues/2404, discussing performance for >> copy vs. move options. >> >> >> [3]: Percona is a SQL database which supports a RocksDB backend. Their >> implementation of ingestion can be found here: >> https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815 >> >> >> [4]: Again, there is not a lot of official resources on this. Notable >> references I found on this include: >> >> * https://github.com/facebook/rocksdb/issues/2473, which describes at a >> high-level how re-insertions work. >> >> * https://github.com/facebook/rocksdb/issues/3540, which describes the >> performance costs for ingesting overlapping SSTables, and specific >> benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564 >> >> * https://github.com/facebook/rocksdb/pull/3179, which describes the >> mechanism for ingesting SSTable files: there need to be point-key overlap >> checks for the LSM. >> >> * https://github.com/facebook/rocksdb/issues/5133, indicates >> re-ingesting the same SSTable (due to restarts in import processes), can >> cause issues for a particular set of options. >> >> * https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, >> indicates compaction occurs more (or, only) when overlapping SSTables >> are ingested. The thinking here is non-overlapping SSTable ingestion means >> very few operations (compaction, merging, etc) occur afterward, with the >> right tuning for generation and ingestion. >> >> * https://github.com/facebook/rocksdb/issues/5010, which discusses some >> unresolved issues for high CPU overhead on ingestion. >> >