GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6276

    [FLINK-9486] Introduce TimerState in keyed state backend

    ## What is the purpose of the change
    
    This PR integrates `InternalTimerQueue` with keyed state backends (Heap and 
RocksDB), so that we can use the RocksDB-based version in the job for the first 
time. 
    
    We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy 
adapter to existing snapshotting code. This can probably be removed once the 
queues are fully integrated with the backend's snapshotting, in a followup PR. 
    
    The PR also addresses an issue with the `TreeOrderedCache` that requires a 
"proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we 
introduce `PriorityComparator` to give more emphasize to this difference. 
`TieBreakingPriorityComparator` is likely to also go away once we come up with 
an improved caching that is not simply based on a tree.
    
    We introduce `PriorityQueueSetFactory` to the keyed state backends, and 
this is were the queues are build. The current implementation of RocksDB uses 
an additional RocksDB instance until we are fully integrated with backend 
snapshotting, because we are otherwise running into trouble with incremental 
snapshots.
    
    A configuration parameter is introduced to chose the implementation of 
queues for RocksDB, the default is still using the heap variant for now.
    
    Finally, we introduce an additional method for bulk polling in the 
`InternalTimerQueue` interface that opens up future optimizations.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as 
`AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to 
activate it via 
    `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes, if 
activated)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink 
integrateSetStateWithBackends

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6276
    
----
commit 0d8743e52a658876425b6cef03fef3fef2d09deb
Author: Stefan Richter <s.richter@...>
Date:   2018-07-04T11:43:49Z

    Remove read options from RocksDBOrderedSetStore

commit 84b1b36357322cf23d50396cbfa0725db95797ea
Author: Stefan Richter <s.richter@...>
Date:   2018-07-04T11:51:14Z

    Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue 
interface to work with the existing snapshotting

commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f
Author: Stefan Richter <s.richter@...>
Date:   2018-07-04T16:07:54Z

    Basic integration with backends / make Rocks timers work

commit 1294ac356162430cf9de86980de1d4a0154f33b8
Author: Stefan Richter <s.richter@...>
Date:   2018-07-05T16:46:34Z

    Introduce PriorityComparator and tie breaking variant as adapter to 
collections that require a comparator.
    
    This is required because the tree set that we use in the cache expects that 
Comparators are aligned with Object#equals

commit bfd3a12e77348a79c91656d80a7a67ece9825103
Author: Stefan Richter <s.richter@...>
Date:   2018-07-05T19:35:08Z

    Iterator directly from cache if no store-only elements.

commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8
Author: Stefan Richter <s.richter@...>
Date:   2018-07-06T08:22:49Z

    Use a dedicated RocksDB instance for priority queue state. We can revert
    this once priority queue state is properly integrated with the
    snapshotting. Until then, we must isolate the priority queue state in
    a separate db or else incremental checkpoints will break.

commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6
Author: Stefan Richter <s.richter@...>
Date:   2018-07-06T13:55:02Z

    Configuration part

commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8
Author: Stefan Richter <s.richter@...>
Date:   2018-07-06T14:48:53Z

    Introduce bulk poll method in queue to open up future optimizations

----


---

Reply via email to