Yun, you mentioned that checkpoint also supports rescale. I thought the recommendation [1] is to use savepoint for rescale.
[1] https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink On Tue, May 26, 2020 at 6:46 AM Joey Pereira <j...@stripe.com> wrote: > Following up: I've put together the implementation, > https://github.com/apache/flink/pull/12345. It's passing tests but is > only partially complete, as it still needs some clean-up and configuration. > I still need to try running this against a production cluster to check the > performance, as well as getting some RocksDB benchmarks. > > On Mon, May 18, 2020 at 3:46 PM Joey Pereira <j...@stripe.com> wrote: > >> Thanks Yun for highlighting this, it's very helpful! I'll give it a go >> with that in mind. >> >> We have already begun using checkpoints for recovery. Having these >> improvements would still be immensely helpful to reduce downtime for >> savepoint recovery. >> >> On Mon, May 18, 2020 at 3:14 PM Yun Tang <myas...@live.com> wrote: >> >>> Hi Joey >>> >>> Previously, I also looked at the mechanism to create on-disk SSTables as >>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I >>> found the main challenge is how to ensure the keys are inserted in a >>> strictly increasing order. The key order in java could differ from the >>> bytes order in RocksDB. In your case, I think it could be much easier as >>> RocksFullSnapshotStrategy write data per columnfamily per key group which >>> should be in a strictly increasing order [1]. >>> >>> FLINK-17288 <https://issues.apache.org/jira/browse/FLINK-17288> could >>> mitigate the performance and your solution could help improve the >>> performance much better (and could integrate with state-processor-api >>> story). >>> >>> On the other hand, for out-of-box to use in production for your >>> scenario, how about using checkpoint to recover, as it also supports >>> rescale and normal recover. >>> >>> [1] >>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308 >>> >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Joey Pereira <j...@stripe.com> >>> *Sent:* Tuesday, May 19, 2020 2:27 >>> *To:* user@flink.apache.org <user@flink.apache.org> >>> *Cc:* Mike Mintz <mikemi...@stripe.com>; Shahid Chohan < >>> cho...@stripe.com>; Aaron Levin <aaronle...@stripe.com> >>> *Subject:* RocksDB savepoint recovery performance improvements >>> >>> Hey, >>> >>> While running a Flink application with a large-state, savepoint recovery >>> has been a painful part of operating the application because recovery time >>> can be several hours. During some profiling that chohan (cc'd) had done, a >>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and >>> Put operations. >>> >>> When Flink is bootstrapping state for RocksDB instances this is not what >>> I would have expected, as RocksDB supports direct ingestion of the on-disk >>> format (SSTables): >>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. >>> This >>> was also recently reported on Jira: >>> https://issues.apache.org/jira/browse/FLINK-17288. >>> >>> From what I understood of the current implementation: >>> >>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and >>> RocksDBIncrementalRestoreOperation, >>> use RocksDBWriteBatchWrapper. >>> >>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This >>> will provide atomicity of batches as well as performance benefits for >>> batching, compared to individual Puts, but it will still involve RocksDB’s >>> insert paths which can involve expensive operations[0]. >>> >>> Instead, by creating SSTable files and instructing RocksDB to ingest the >>> files, writes can be batched even further and avoid expensive operations in >>> RocksDB. This is commonly utilized by other systems for restoration or >>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3]. There >>> are some restrictions on being able to generate SSTables, as well as >>> limitations for ingestion to be performant. Unfortunately, it’s all not >>> very well documented: >>> >>> 1. When generating an SSTable, keys need to be inserted in-order. >>> >>> 2. Ingested files should not have key-ranges that overlap with either >>> existing or other ingested files[4]. It is possible to ingest overlapping >>> SSTables, but this may incur significant overhead. >>> >>> To generate SSTables with non-overlapping key-ranges and to create them >>> with keys in-order, it would mean that the savepoints would need to be >>> ordered while processing them. I'm unsure if this is the case for how >>> Flink's savepoints are stored. >>> >>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it >>> is used (eg: for incremental checkpoint or something else). I did >>> notice it is iterating over a temporary RocksDB instance and inserting into >>> a "final” instance. These writes could be optimized in a similar >>> manner. Alternatively, it could be possible to use the temporary instance's >>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange. >>> >>> To get started with prototyping, I was thinking of taking a simple >>> approach of making an interface for RocksDBWriteBatchWrapper and swapping >>> the implementation for one that does SSTable generation and ingestion. I >>> reckon that will be an easy way to sanity check whether it works at all. >>> >>> I was also planning on writing some benchmarks in RocksDB to understand >>> the difference for ingestion scenarios, as RocksDB itself is sparse on >>> details about SSTable ingestion[4] and does not have benchmarking for >>> ingestion. >>> >>> Does all of that seem sound? I'll report back when I get time to work >>> out that implementation and tests, likely during the coming weekend. >>> >>> >>> Joey >>> >>> --- >>> >>> [0]: I don’t have any specific sources on this. At a high-level, some of >>> the operations happening during writes include writing to the memtable >>> before flushing to an SSTable and doing merging and/or compaction. In >>> general, these will add write-amplification and overall overhead to bulk >>> insertion. These can largely be avoided by giving RocksDB SSTables, >>> especially if they have non-overlapping key-ranges. "Characterizing, >>> Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" ( >>> https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a >>> helpful source that highlights what happens during various workloads. >>> >>> >>> [1]: CockroachDB is a database that uses RocksDB as the on-disk storage. >>> Their implementation consolidates bulk ingestion to an AddSSTable >>> command. Primarily, they have some choice of options for SSTable generation >>> and ingestion that are of interest: >>> >>> * SSTable generation: >>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966 >>> >>> * SSTable ingestion: >>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917 >>> >>> >>> [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB >>> as the on-disk storage. Their implementation of bulk ingestion is contained >>> within: >>> https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs >>> >>> Other useful references: >>> - https://github.com/tikv/tikv/issues/2404, discussing performance for >>> copy vs. move options. >>> >>> >>> [3]: Percona is a SQL database which supports a RocksDB backend. Their >>> implementation of ingestion can be found here: >>> https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815 >>> >>> >>> [4]: Again, there is not a lot of official resources on this. Notable >>> references I found on this include: >>> >>> * https://github.com/facebook/rocksdb/issues/2473, which describes at a >>> high-level how re-insertions work. >>> >>> * https://github.com/facebook/rocksdb/issues/3540, which describes the >>> performance costs for ingesting overlapping SSTables, and specific >>> benchmarks (post-fix) here: >>> https://github.com/facebook/rocksdb/pull/3564 >>> >>> * https://github.com/facebook/rocksdb/pull/3179, which describes the >>> mechanism for ingesting SSTable files: there need to be point-key overlap >>> checks for the LSM. >>> >>> * https://github.com/facebook/rocksdb/issues/5133, indicates >>> re-ingesting the same SSTable (due to restarts in import processes), >>> can cause issues for a particular set of options. >>> >>> * https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, >>> indicates compaction occurs more (or, only) when overlapping SSTables >>> are ingested. The thinking here is non-overlapping SSTable ingestion means >>> very few operations (compaction, merging, etc) occur afterward, with >>> the right tuning for generation and ingestion. >>> >>> * https://github.com/facebook/rocksdb/issues/5010, which discusses some >>> unresolved issues for high CPU overhead on ingestion. >>> >>