Github user makeyang commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r187297365 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times * * @param context context that provides information and means required for taking a snapshot */ - public void snapshotState(StateSnapshotContext context) throws Exception { + public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception { if (getKeyedStateBackend() != null) { KeyedStateCheckpointOutputStream out; - + OperatorStateCheckpointOutputStream metaOut; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception); } - try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - timeServiceManager.snapshotStateForKeyGroup( - new DataOutputViewStreamWrapper(out), keyGroupIdx); - } + metaOut = context.getRawKeyedOperatorStateMetaOutput(); } catch (Exception exception) { - throw new Exception("Could not write timer service of " + getOperatorName() + - " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " + - "might have prevented deleting some state data.", getOperatorName(), closeException); - } + throw new Exception("Could not open raw operator state stream for " + + getOperatorName() + '.', exception); } + final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState(); + final int currentSnapshotVersion = ret.f0; + final Map<String, HeapInternalTimerService> timerServices = ret.f1; + final Integer stateTableVersion = ret.f2; + final TreeSet<Integer> snapshotVersions = ret.f3; + LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString()); + Callable<Boolean> snapshotTimerCallable = new Callable() { + @Override + public Boolean call() { + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + metaOut.startNewPartition(); + DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut); + metaWrapper.writeInt(stateTableVersion); + if (snapshotVersions.size() > 0) { + metaWrapper.writeInt(snapshotVersions.size()); + for (Integer i : snapshotVersions) { + metaWrapper.writeInt(i); + } + } + else { + metaWrapper.writeInt(0); + } + int keyGroupCount = allKeyGroups.getNumberOfKeyGroups(); + metaWrapper.writeInt(keyGroupCount); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); + metaWrapper.writeInt(keyGroupIdx); + InternalTimerServiceSerializationProxy serializationProxy = + new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx, + currentSnapshotVersion, timeServiceManager, metaWrapper); + + serializationProxy.write(new DataOutputViewStreamWrapper(out)); + + } + LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString()); + return true; + } catch (Exception exception) { + LOG.error("Could not write timer service of " + getOperatorName() + + " to checkpoint state stream.", exception); + return false; + } finally { + timeServiceManager.stopOneSnapshotState(currentSnapshotVersion); + StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context; + try { + snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); + } catch (IOException e) { + LOG.warn("setKeyedStateRawFuture in callable excpetion", e); + return false; --- End diff -- how about change return type from Boolean to Tuple which contains throwable when exception happens?
---