[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874300#comment-15874300 ]
ASF GitHub Bot commented on FLINK-5544: --------------------------------------- GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3359 [FLINK-5544][streaming] Add InternalTimerService implemented in RocksDB - Refactor the methods defined in `InternalTimerService`. Some common implementation in `HeapInternalTimerService` now is moved in `InternalTimerService`. - Implement `RocksDBInternalTimerService` which stores timers in RocksDB and sorts them with an in-momory heap. - Implement `InternalTimerServiceTestBase` to verify the implementation of `InternalTimerService`. - Update `AbstractStreamOperator` to allow the usage of customized `InternalTimerService`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5544 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3359.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3359 ---- commit 341fd97c47336d4f87cea997e134af68f8ef5265 Author: xiaogang.sxg <xiaogang....@alibaba-inc.com> Date: 2017-02-20T09:55:40Z Add InternalTimerService implemented in RocksDB ---- > Implement Internal Timer Service in RocksDB > ------------------------------------------- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)