[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462200#comment-16462200 ]
ASF GitHub Bot commented on FLINK-9182: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185734334 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times * * @param context context that provides information and means required for taking a snapshot */ - public void snapshotState(StateSnapshotContext context) throws Exception { + public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception { if (getKeyedStateBackend() != null) { KeyedStateCheckpointOutputStream out; - + OperatorStateCheckpointOutputStream metaOut; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception); } - try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - timeServiceManager.snapshotStateForKeyGroup( - new DataOutputViewStreamWrapper(out), keyGroupIdx); - } + metaOut = context.getRawKeyedOperatorStateMetaOutput(); } catch (Exception exception) { - throw new Exception("Could not write timer service of " + getOperatorName() + - " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " + - "might have prevented deleting some state data.", getOperatorName(), closeException); - } + throw new Exception("Could not open raw operator state stream for " + + getOperatorName() + '.', exception); } + final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState(); + final int currentSnapshotVersion = ret.f0; + final Map<String, HeapInternalTimerService> timerServices = ret.f1; + final Integer stateTableVersion = ret.f2; + final TreeSet<Integer> snapshotVersions = ret.f3; + LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString()); + Callable<Boolean> snapshotTimerCallable = new Callable() { + @Override + public Boolean call() { + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + metaOut.startNewPartition(); + DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut); + metaWrapper.writeInt(stateTableVersion); + if (snapshotVersions.size() > 0) { + metaWrapper.writeInt(snapshotVersions.size()); + for (Integer i : snapshotVersions) { + metaWrapper.writeInt(i); + } + } + else { + metaWrapper.writeInt(0); + } + int keyGroupCount = allKeyGroups.getNumberOfKeyGroups(); + metaWrapper.writeInt(keyGroupCount); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); + metaWrapper.writeInt(keyGroupIdx); + InternalTimerServiceSerializationProxy serializationProxy = + new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx, + currentSnapshotVersion, timeServiceManager, metaWrapper); + + serializationProxy.write(new DataOutputViewStreamWrapper(out)); + + } + LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString()); + return true; + } catch (Exception exception) { + LOG.error("Could not write timer service of " + getOperatorName() + + " to checkpoint state stream.", exception); + return false; + } finally { + timeServiceManager.stopOneSnapshotState(currentSnapshotVersion); + StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context; + try { + snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); + } catch (IOException e) { + LOG.warn("setKeyedStateRawFuture in callable excpetion", e); + return false; --- End diff -- Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`. > async checkpoints for timer service > ----------------------------------- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.5.0, 1.4.2 > Reporter: makeyang > Assignee: makeyang > Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy <String,HeapInternalTimerService> tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)