Following up: I've put together the implementation,
https://github.com/apache/flink/pull/12345. It's passing tests but is
only partially complete, as it still needs some clean-up and configuration.
I still need to try running this against a production cluster to check the
performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira <j...@stripe.com> wrote:

> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
> with that in mind.
>
> We have already begun using checkpoints for recovery. Having these
> improvements would still be immensely helpful to reduce downtime for
> savepoint recovery.
>
> On Mon, May 18, 2020 at 3:14 PM Yun Tang <myas...@live.com> wrote:
>
>> Hi Joey
>>
>> Previously, I also looked at the mechanism to create on-disk SSTables as
>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>> found the main challenge is how to ensure the keys are inserted in a
>> strictly increasing order. The key order in java could differ from the
>> bytes order in RocksDB. In your case, I think it could be much easier as
>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>> should be in a strictly increasing order [1].
>>
>> FLINK-17288 <https://issues.apache.org/jira/browse/FLINK-17288> could
>> mitigate the performance and your solution could help improve the
>> performance much better (and could integrate with state-processor-api
>> story).
>>
>> On the other hand, for out-of-box to use in production for your scenario,
>> how about using checkpoint to recover, as it also supports rescale and
>> normal recover.
>>
>> [1]
>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Joey Pereira <j...@stripe.com>
>> *Sent:* Tuesday, May 19, 2020 2:27
>> *To:* user@flink.apache.org <user@flink.apache.org>
>> *Cc:* Mike Mintz <mikemi...@stripe.com>; Shahid Chohan <cho...@stripe.com>;
>> Aaron Levin <aaronle...@stripe.com>
>> *Subject:* RocksDB savepoint recovery performance improvements
>>
>> Hey,
>>
>> While running a Flink application with a large-state, savepoint recovery
>> has been a painful part of operating the application because recovery time
>> can be several hours. During some profiling that chohan (cc'd) had done, a
>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>> Put operations.
>>
>> When Flink is bootstrapping state for RocksDB instances this is not what
>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>> format (SSTables):
>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>> This
>> was also recently reported on Jira:
>> https://issues.apache.org/jira/browse/FLINK-17288.
>>
>> From what I understood of the current implementation:
>>
>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>> RocksDBIncrementalRestoreOperation,
>> use RocksDBWriteBatchWrapper.
>>
>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>> will provide atomicity of batches as well as performance benefits for
>> batching, compared to individual Puts, but it will still involve RocksDB’s
>> insert paths which can involve expensive operations[0].
>>
>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>> files, writes can be batched even further and avoid expensive operations in
>> RocksDB. This is commonly utilized by other systems for restoration or
>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>> are some restrictions on being able to generate SSTables, as well as
>> limitations for ingestion to be performant. Unfortunately, it’s all not
>> very well documented:
>>
>> 1. When generating an SSTable, keys need to be inserted in-order.
>>
>> 2. Ingested files should not have key-ranges that overlap with either
>> existing or other ingested files[4]. It is possible to ingest overlapping
>> SSTables, but this may incur significant overhead.
>>
>> To generate SSTables with non-overlapping key-ranges and to create them
>> with keys in-order, it would mean that the savepoints would need to be
>> ordered while processing them. I'm unsure if this is the case for how
>> Flink's savepoints are stored.
>>
>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is
>> used (eg: for incremental checkpoint or something else). I did notice it
>> is iterating over a temporary RocksDB instance and inserting into a "final
>> ” instance. These writes could be optimized in a similar manner.
>> Alternatively, it could be possible to use the temporary instance's
>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange.
>>
>> To get started with prototyping, I was thinking of taking a simple
>> approach of making an interface for RocksDBWriteBatchWrapper and swapping
>> the implementation for one that does SSTable generation and ingestion. I
>> reckon that will be an easy way to sanity check whether it works at all.
>>
>> I was also planning on writing some benchmarks in RocksDB to understand
>> the difference for ingestion scenarios, as RocksDB itself is sparse on
>> details about SSTable ingestion[4] and does not have benchmarking for
>> ingestion.
>>
>> Does all of that seem sound? I'll report back when I get time to work out
>> that implementation and tests, likely during the coming weekend.
>>
>>
>> Joey
>>
>> ---
>>
>> [0]: I don’t have any specific sources on this. At a high-level, some of
>> the operations happening during writes include writing to the memtable
>> before flushing to an SSTable and doing merging and/or compaction. In
>> general, these will add write-amplification and overall overhead to bulk
>> insertion. These can largely be avoided by giving RocksDB SSTables,
>> especially if they have non-overlapping key-ranges.  "Characterizing,
>> Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (
>> https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful
>> source that highlights what happens during various workloads.
>>
>>
>> [1]: CockroachDB is a database that uses RocksDB as the on-disk storage.
>> Their implementation consolidates bulk ingestion to an AddSSTable
>> command. Primarily, they have some choice of options for SSTable generation
>> and ingestion that are of interest:
>>
>> * SSTable generation:
>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966
>>
>> * SSTable ingestion:
>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917
>>
>>
>> [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB
>> as the on-disk storage. Their implementation of bulk ingestion is contained
>> within:
>> https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs
>>
>> Other useful references:
>> - https://github.com/tikv/tikv/issues/2404, discussing performance for
>> copy vs. move options.
>>
>>
>> [3]: Percona is a SQL database which supports a RocksDB backend. Their
>> implementation of ingestion can be found here:
>> https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815
>>
>>
>> [4]: Again, there is not a lot of official resources on this. Notable
>> references I found on this include:
>>
>> * https://github.com/facebook/rocksdb/issues/2473, which describes at a
>> high-level how re-insertions work.
>>
>> * https://github.com/facebook/rocksdb/issues/3540, which describes the
>> performance costs for ingesting overlapping SSTables, and specific
>> benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564
>>
>> * https://github.com/facebook/rocksdb/pull/3179, which describes the
>> mechanism for ingesting SSTable files: there need to be point-key overlap
>> checks for the LSM.
>>
>> * https://github.com/facebook/rocksdb/issues/5133, indicates
>> re-ingesting the same SSTable (due to restarts in import processes), can
>> cause issues for a particular set of options.
>>
>> * https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245,
>> indicates compaction occurs more (or, only) when overlapping SSTables
>> are ingested. The thinking here is non-overlapping SSTable ingestion means
>> very few operations (compaction, merging, etc) occur afterward, with the
>> right tuning for generation and ingestion.
>>
>> * https://github.com/facebook/rocksdb/issues/5010, which discusses some
>> unresolved issues for high CPU overhead on ingestion.
>>
>

Reply via email to