This is so helpful, thank you! So just to clarify (3), Operator state has a partitioning scheme, but it's simply not by key, it's something else that's special under-the-hood? In which case, what data is stored in an Operator? I assumed it must be the input data for e.g. a join, so that it can react efficiently to any data changes in the stream and recombine only what has actually changed. Is this correct?
On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <r...@remind101.com> wrote: > >> Hello! >> >> I've been digging into State Storage documentation, but it's left me >> scratching my head with a few questions. Any help will be much appreciated. >> >> Qs: >> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR? >> Possibly with S3 backed savepoints for recovery (or maybe hdfs for >> savepoints?)? Only documentation related to AWS I can find makes it look >> like AWS must use the S3 File System state backend and not RocksDB at all. >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html >> > > I think there's some misunderstanding of the role of RocksDB vs > filesystems for fault-tolerance here. > RocksDB is a state backend option that manages user state out-of-core, and > is managed by the Flink runtime. Users do not need to separately manage > RocksDB instances. > For persistence of that state as checkpoints / savepoints for > fault-tolerance, you may choose the commonly used filesystems like S3 / > HDFS. > > See [1] for how to configure your job to use RocksDBStateBackend as the > runtime state backend and configuring a filesystem path for persistence. > > >> >> 2. Does the FS state backend not compact? I thought everything in Flink >> was stored as key/value. In which case, why would the last n values for a >> key need to stick around, or how would they? >> > An incremental checkpoint builds upon (typically multiple) previous >> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a >> way that is self-consolidating over time. As a result, the incremental >> checkpoint history in Flink does not grow indefinitely, and old checkpoints >> are eventually subsumed and pruned automatically. >> >> > The sentence that you quote simply states how Flink leverages RocksDB's > background compaction of SSTables to ensure that incremental checkpoints > don't grow indefinitely in size. > This has nothing to do with the FsStateBackend, as incremental > checkpointing isn't supported there. > > Just as a clarification as there might be some other misunderstanding here: > The difference between FsStateBackend v.s. RocksDBStateBackend is the > state backend being used to maintain local state at runtime. > RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses > in-memory hash maps. For persistence, both are checkpointed to a filesystem > for fault-tolerance. > The naming may be a bit confusing, so just wanted to clarify that here in > case that may have caused any confusion with the questions above. > > >> 3. In the docs, Operators are referred to as non-keyed state, yet, >> Operators have IDs that they are keyed by, so why are they referred to as >> non-keyed state? >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids >> >> > Operator state is referred to as non-keyed state because it is not > co-partitioned with the stream by key and not values are not bound to > single key (i.e. when you access keyed state, the access is bound to a > single key), and have different schemes for repartitioning when operators > are scaled up or down. > The operator IDs you referred to are simply a unique ID to identify the > same operators across different executions of the same job. I'm not sure > what you mean by "operators have IDs that are keyed by"; those IDs are not > used in any partitioning operation. > > > >> 4. For the Table API / SQL are primary keys and join keys automatically >> used as the keys for state under the hood? >> > > Yes. > > >> >> Lastly >> 5. Is there a way to estimate roughly how much disk space state storage >> will take per operation? >> >> > Thanks again! >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > Cheers, > Gordon > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>