GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6333

    [FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

    ## What is the purpose of the change
    
    This PR integrates priority queue state (timers) with the snapshotting of 
Flink's state backend ans also already includes backwards compatibility 
(FLINK-9490). Core idea is to have a common abstraction for how state is 
registered in the state backend and how snapshots operator on such state 
(`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new 
state integrates more or less seemless with existing snapshot logic. The 
notable exception is a current lack of integration of RocksDB state backend 
with heap-based priority queue state. This can currently still use the old 
snapshot code without causing any regression using a temporary path (see 
`AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink 
supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks 
queue (full and incremental), rocks kv / heap queue (only full)  and still uses 
synchronous snapshots for rocks kv / heap queue (only incremental).
    
    This work was created in a bit of a rush to make it into the 1.6 release 
and still has some known rough edges that we could fix up in the test phase. 
Here is a list of some things that already come to my mind:
    
    - Integrate heap-based timers with incremental RocksDB snapshots, then kick 
out some code.
    - Check proper integration with serializer upgrade story (!!)
    - After that, we can also remove the key-partitioning in the set structure 
from `HeapPriorityQueueSet`.
    - Improve integration of the batch wrapper.
    - Improve general state registration logic in the backends, there is 
potential to remove duplicated code, and generally still improve the 
integration of the queue state a bit.
    - Improve performance of RocksDB based timers, e.g. byte[] based cache, 
seek sharp to the next potential timer instead of seeking to the key-group 
start, bulkPoll.
    - Improve some class/interface/method names
    - Add tests, e.g. bulkPoll etc.
    
    ## Verifying this change
    
    This change is already covered by existing tests, but I would add some more 
eventually. You can activate RocksDB based timers by using the RocksDB backend 
and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (yes)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs only for now)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6333.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6333
    
----
commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter <s.richter@...>
Date:   2018-06-13T09:56:16Z

    [FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter <s.richter@...>
Date:   2018-07-13T23:19:30Z

    Optimization for relaxed bulk polls

commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter <s.richter@...>
Date:   2018-07-14T06:34:16Z

    Renaming of some classes/interfaces

----


---

Reply via email to