[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462200#comment-16462200
 ] 

ASF GitHub Bot commented on FLINK-9182:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185734334
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures 
snapshotState(long checkpointId, long times
         *
         * @param context context that provides information and means required 
for taking a snapshot
         */
    -   public void snapshotState(StateSnapshotContext context) throws 
Exception {
    +   public void snapshotState(StateSnapshotContext context, 
OperatorSnapshotFutures snapshotInProgress) throws Exception {
                if (getKeyedStateBackend() != null) {
                        KeyedStateCheckpointOutputStream out;
    -
    +                   OperatorStateCheckpointOutputStream metaOut;
                        try {
                                out = context.getRawKeyedOperatorStateOutput();
                        } catch (Exception exception) {
                                throw new Exception("Could not open raw keyed 
operator state stream for " +
                                        getOperatorName() + '.', exception);
                        }
    -
                        try {
    -                           KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
    -                           for (int keyGroupIdx : allKeyGroups) {
    -                                   out.startNewKeyGroup(keyGroupIdx);
    -
    -                                   
timeServiceManager.snapshotStateForKeyGroup(
    -                                           new 
DataOutputViewStreamWrapper(out), keyGroupIdx);
    -                           }
    +                           metaOut = 
context.getRawKeyedOperatorStateMetaOutput();
                        } catch (Exception exception) {
    -                           throw new Exception("Could not write timer 
service of " + getOperatorName() +
    -                                   " to checkpoint state stream.", 
exception);
    -                   } finally {
    -                           try {
    -                                   out.close();
    -                           } catch (Exception closeException) {
    -                                   LOG.warn("Could not close raw keyed 
operator state stream for {}. This " +
    -                                           "might have prevented deleting 
some state data.", getOperatorName(), closeException);
    -                           }
    +                           throw new Exception("Could not open raw 
operator state stream for " +
    +                                   getOperatorName() + '.', exception);
                        }
    +                   final Tuple4<Integer, Map<String, 
HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = 
timeServiceManager.startOneSnapshotState();
    +                   final int currentSnapshotVersion = ret.f0;
    +                   final Map<String, HeapInternalTimerService> 
timerServices = ret.f1;
    +                   final Integer stateTableVersion = ret.f2;
    +                   final TreeSet<Integer> snapshotVersions = ret.f3;
    +                   LOG.info("snapshotVersions after calling 
startOneSnapshotState:" + snapshotVersions.toString());
    +                   Callable<Boolean> snapshotTimerCallable = new 
Callable() {
    +                           @Override
    +                           public Boolean call() {
    +                                   try {
    +                                           KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
    +                                           metaOut.startNewPartition();
    +                                           DataOutputViewStreamWrapper 
metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +                                           
metaWrapper.writeInt(stateTableVersion);
    +                                           if (snapshotVersions.size() > 
0) {
    +                                                   
metaWrapper.writeInt(snapshotVersions.size());
    +                                                   for (Integer i : 
snapshotVersions) {
    +                                                           
metaWrapper.writeInt(i);
    +                                                   }
    +                                           }
    +                                           else {
    +                                                   metaWrapper.writeInt(0);
    +                                           }
    +                                           int keyGroupCount = 
allKeyGroups.getNumberOfKeyGroups();
    +                                           
metaWrapper.writeInt(keyGroupCount);
    +                                           for (int keyGroupIdx : 
allKeyGroups) {
    +                                                   
out.startNewKeyGroup(keyGroupIdx);
    +                                                   
metaWrapper.writeInt(keyGroupIdx);
    +                                                   
InternalTimerServiceSerializationProxy serializationProxy =
    +                                                           new 
InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +                                                                   
currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +                                                   
serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +                                           }
    +                                           LOG.info("return Tuple4 and 
snapshotVersions:" + snapshotVersions.toString());
    +                                           return true;
    +                                   } catch (Exception exception) {
    +                                           LOG.error("Could not write 
timer service of " + getOperatorName() +
    +                                                   " to checkpoint state 
stream.", exception);
    +                                           return false;
    +                                   } finally {
    +                                           
timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +                                           
StateSnapshotContextSynchronousImpl snapshotContext = 
(StateSnapshotContextSynchronousImpl) context;
    +                                           try {
    +                                                   
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +                                           } catch (IOException e) {
    +                                                   
LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +                                                   return false;
    --- End diff --
    
    Putting returns inside a finally block here is a bad idea because this can 
swallow and hide `Throwables`.


> async checkpoints for timer service
> -----------------------------------
>
>                 Key: FLINK-9182
>                 URL: https://issues.apache.org/jira/browse/FLINK-9182
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: makeyang
>            Assignee: makeyang
>            Priority: Minor
>             Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy <String,HeapInternalTimerService> tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to