Yun, you mentioned that checkpoint also supports rescale. I thought the
recommendation [1] is to use savepoint for rescale.

[1]
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira <j...@stripe.com> wrote:

> Following up: I've put together the implementation,
> https://github.com/apache/flink/pull/12345. It's passing tests but is
> only partially complete, as it still needs some clean-up and configuration.
> I still need to try running this against a production cluster to check the
> performance, as well as getting some RocksDB benchmarks.
>
> On Mon, May 18, 2020 at 3:46 PM Joey Pereira <j...@stripe.com> wrote:
>
>> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
>> with that in mind.
>>
>> We have already begun using checkpoints for recovery. Having these
>> improvements would still be immensely helpful to reduce downtime for
>> savepoint recovery.
>>
>> On Mon, May 18, 2020 at 3:14 PM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Joey
>>>
>>> Previously, I also looked at the mechanism to create on-disk SSTables as
>>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>>> found the main challenge is how to ensure the keys are inserted in a
>>> strictly increasing order. The key order in java could differ from the
>>> bytes order in RocksDB. In your case, I think it could be much easier as
>>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>>> should be in a strictly increasing order [1].
>>>
>>> FLINK-17288 <https://issues.apache.org/jira/browse/FLINK-17288> could
>>> mitigate the performance and your solution could help improve the
>>> performance much better (and could integrate with state-processor-api
>>> story).
>>>
>>> On the other hand, for out-of-box to use in production for your
>>> scenario, how about using checkpoint to recover, as it also supports
>>> rescale and normal recover.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>>
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Joey Pereira <j...@stripe.com>
>>> *Sent:* Tuesday, May 19, 2020 2:27
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Cc:* Mike Mintz <mikemi...@stripe.com>; Shahid Chohan <
>>> cho...@stripe.com>; Aaron Levin <aaronle...@stripe.com>
>>> *Subject:* RocksDB savepoint recovery performance improvements
>>>
>>> Hey,
>>>
>>> While running a Flink application with a large-state, savepoint recovery
>>> has been a painful part of operating the application because recovery time
>>> can be several hours. During some profiling that chohan (cc'd) had done, a
>>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>>> Put operations.
>>>
>>> When Flink is bootstrapping state for RocksDB instances this is not what
>>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>>> format (SSTables):
>>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>>> This
>>> was also recently reported on Jira:
>>> https://issues.apache.org/jira/browse/FLINK-17288.
>>>
>>> From what I understood of the current implementation:
>>>
>>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>>> RocksDBIncrementalRestoreOperation,
>>> use RocksDBWriteBatchWrapper.
>>>
>>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>>> will provide atomicity of batches as well as performance benefits for
>>> batching, compared to individual Puts, but it will still involve RocksDB’s
>>> insert paths which can involve expensive operations[0].
>>>
>>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>>> files, writes can be batched even further and avoid expensive operations in
>>> RocksDB. This is commonly utilized by other systems for restoration or
>>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>>> are some restrictions on being able to generate SSTables, as well as
>>> limitations for ingestion to be performant. Unfortunately, it’s all not
>>> very well documented:
>>>
>>> 1. When generating an SSTable, keys need to be inserted in-order.
>>>
>>> 2. Ingested files should not have key-ranges that overlap with either
>>> existing or other ingested files[4]. It is possible to ingest overlapping
>>> SSTables, but this may incur significant overhead.
>>>
>>> To generate SSTables with non-overlapping key-ranges and to create them
>>> with keys in-order, it would mean that the savepoints would need to be
>>> ordered while processing them. I'm unsure if this is the case for how
>>> Flink's savepoints are stored.
>>>
>>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it
>>> is used (eg: for incremental checkpoint or something else). I did
>>> notice it is iterating over a temporary RocksDB instance and inserting into
>>> a "final” instance. These writes could be optimized in a similar
>>> manner. Alternatively, it could be possible to use the temporary instance's
>>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange.
>>>
>>> To get started with prototyping, I was thinking of taking a simple
>>> approach of making an interface for RocksDBWriteBatchWrapper and swapping
>>> the implementation for one that does SSTable generation and ingestion. I
>>> reckon that will be an easy way to sanity check whether it works at all.
>>>
>>> I was also planning on writing some benchmarks in RocksDB to understand
>>> the difference for ingestion scenarios, as RocksDB itself is sparse on
>>> details about SSTable ingestion[4] and does not have benchmarking for
>>> ingestion.
>>>
>>> Does all of that seem sound? I'll report back when I get time to work
>>> out that implementation and tests, likely during the coming weekend.
>>>
>>>
>>> Joey
>>>
>>> ---
>>>
>>> [0]: I don’t have any specific sources on this. At a high-level, some of
>>> the operations happening during writes include writing to the memtable
>>> before flushing to an SSTable and doing merging and/or compaction. In
>>> general, these will add write-amplification and overall overhead to bulk
>>> insertion. These can largely be avoided by giving RocksDB SSTables,
>>> especially if they have non-overlapping key-ranges.  "Characterizing,
>>> Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (
>>> https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a
>>> helpful source that highlights what happens during various workloads.
>>>
>>>
>>> [1]: CockroachDB is a database that uses RocksDB as the on-disk storage.
>>> Their implementation consolidates bulk ingestion to an AddSSTable
>>> command. Primarily, they have some choice of options for SSTable generation
>>> and ingestion that are of interest:
>>>
>>> * SSTable generation:
>>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966
>>>
>>> * SSTable ingestion:
>>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917
>>>
>>>
>>> [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB
>>> as the on-disk storage. Their implementation of bulk ingestion is contained
>>> within:
>>> https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs
>>>
>>> Other useful references:
>>> - https://github.com/tikv/tikv/issues/2404, discussing performance for
>>> copy vs. move options.
>>>
>>>
>>> [3]: Percona is a SQL database which supports a RocksDB backend. Their
>>> implementation of ingestion can be found here:
>>> https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815
>>>
>>>
>>> [4]: Again, there is not a lot of official resources on this. Notable
>>> references I found on this include:
>>>
>>> * https://github.com/facebook/rocksdb/issues/2473, which describes at a
>>> high-level how re-insertions work.
>>>
>>> * https://github.com/facebook/rocksdb/issues/3540, which describes the
>>> performance costs for ingesting overlapping SSTables, and specific
>>> benchmarks (post-fix) here:
>>> https://github.com/facebook/rocksdb/pull/3564
>>>
>>> * https://github.com/facebook/rocksdb/pull/3179, which describes the
>>> mechanism for ingesting SSTable files: there need to be point-key overlap
>>> checks for the LSM.
>>>
>>> * https://github.com/facebook/rocksdb/issues/5133, indicates
>>> re-ingesting the same SSTable (due to restarts in import processes),
>>> can cause issues for a particular set of options.
>>>
>>> * https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245,
>>> indicates compaction occurs more (or, only) when overlapping SSTables
>>> are ingested. The thinking here is non-overlapping SSTable ingestion means
>>> very few operations (compaction, merging, etc) occur afterward, with
>>> the right tuning for generation and ingestion.
>>>
>>> * https://github.com/facebook/rocksdb/issues/5010, which discusses some
>>> unresolved issues for high CPU overhead on ingestion.
>>>
>>

Reply via email to