
ASF GitHub Bot commented on FLINK-9182:

Github user StefanRRichter commented on the issue:

    Maybe let me add some more. First, about introducing a separate new state 
handle. Our long term plan is actually to integrate timers more closely with 
the backends, so that we can also have a timer service in RocksDB. In general, 
I would target for reducing the handles(e.g. timers could then be considered 
also as managed keyed stated) instead of adding more. Second, I would probably 
suggest a simpler model for the async snapshots. You dropped the idea of making 
flat copies, but I wonder if this was premature. I can see that maybe calling 
`set.toArray(...)` per keygroup could (maybe) turn out a bit slow because it 
has to potentially iterate and flatten linked entries. However, with async 
snapshots, we could get rid of the key-group partitioning of sets, and instead 
do a flat copy of the internal array of the priority queue. This would 
translate to just a single memcopy call internally, which is very efficient. In 
the async part, we can still partition the timers by key-group in a similar way 
as the copy-on-write state table does. This would avoid slowing down the event 
processing path (in fact improving it be unifying the sets) and also keep the 
approach very straight forward and less invasive.

> async checkpoints for timer service
> -----------------------------------
>                 Key: FLINK-9182
>                 URL: https://issues.apache.org/jira/browse/FLINK-9182
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: makeyang
>            Assignee: makeyang
>            Priority: Minor
>             Fix For: 1.4.3, 1.5.1
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy <String,HeapInternalTimerService> tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon

This message was sent by Atlassian JIRA

Reply via email to