GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6333
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state ## What is the purpose of the change This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (see `AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks queue (full and incremental), rocks kv / heap queue (only full) and still uses synchronous snapshots for rocks kv / heap queue (only incremental). This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges that we could fix up in the test phase. Here is a list of some things that already come to my mind: - Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code. - Check proper integration with serializer upgrade story (!!) - After that, we can also remove the key-partitioning in the set structure from `HeapPriorityQueueSet`. - Improve integration of the batch wrapper. - Improve general state registration logic in the backends, there is potential to remove duplicated code, and generally still improve the integration of the queue state a bit. - Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to the next potential timer instead of seeking to the key-group start, bulkPoll. - Improve some class/interface/method names - Add tests, e.g. bulkPoll etc. ## Verifying this change This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs only for now) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6333.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6333 ---- commit 1bb8f70700deacc49a4d4ac7900425c10272959d Author: Stefan Richter <s.richter@...> Date: 2018-06-13T09:56:16Z [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state commit fc20df8268decab6d9890d617787a4084284b2f0 Author: Stefan Richter <s.richter@...> Date: 2018-07-13T23:19:30Z Optimization for relaxed bulk polls commit 4db1bea90fd6881555172fe3d22ee928e97894a7 Author: Stefan Richter <s.richter@...> Date: 2018-07-14T06:34:16Z Renaming of some classes/interfaces ---- ---