Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187297365
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures 
snapshotState(long checkpointId, long times
         *
         * @param context context that provides information and means required 
for taking a snapshot
         */
    -   public void snapshotState(StateSnapshotContext context) throws 
Exception {
    +   public void snapshotState(StateSnapshotContext context, 
OperatorSnapshotFutures snapshotInProgress) throws Exception {
                if (getKeyedStateBackend() != null) {
                        KeyedStateCheckpointOutputStream out;
    -
    +                   OperatorStateCheckpointOutputStream metaOut;
                        try {
                                out = context.getRawKeyedOperatorStateOutput();
                        } catch (Exception exception) {
                                throw new Exception("Could not open raw keyed 
operator state stream for " +
                                        getOperatorName() + '.', exception);
                        }
    -
                        try {
    -                           KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
    -                           for (int keyGroupIdx : allKeyGroups) {
    -                                   out.startNewKeyGroup(keyGroupIdx);
    -
    -                                   
timeServiceManager.snapshotStateForKeyGroup(
    -                                           new 
DataOutputViewStreamWrapper(out), keyGroupIdx);
    -                           }
    +                           metaOut = 
context.getRawKeyedOperatorStateMetaOutput();
                        } catch (Exception exception) {
    -                           throw new Exception("Could not write timer 
service of " + getOperatorName() +
    -                                   " to checkpoint state stream.", 
exception);
    -                   } finally {
    -                           try {
    -                                   out.close();
    -                           } catch (Exception closeException) {
    -                                   LOG.warn("Could not close raw keyed 
operator state stream for {}. This " +
    -                                           "might have prevented deleting 
some state data.", getOperatorName(), closeException);
    -                           }
    +                           throw new Exception("Could not open raw 
operator state stream for " +
    +                                   getOperatorName() + '.', exception);
                        }
    +                   final Tuple4<Integer, Map<String, 
HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = 
timeServiceManager.startOneSnapshotState();
    +                   final int currentSnapshotVersion = ret.f0;
    +                   final Map<String, HeapInternalTimerService> 
timerServices = ret.f1;
    +                   final Integer stateTableVersion = ret.f2;
    +                   final TreeSet<Integer> snapshotVersions = ret.f3;
    +                   LOG.info("snapshotVersions after calling 
startOneSnapshotState:" + snapshotVersions.toString());
    +                   Callable<Boolean> snapshotTimerCallable = new 
Callable() {
    +                           @Override
    +                           public Boolean call() {
    +                                   try {
    +                                           KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
    +                                           metaOut.startNewPartition();
    +                                           DataOutputViewStreamWrapper 
metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +                                           
metaWrapper.writeInt(stateTableVersion);
    +                                           if (snapshotVersions.size() > 
0) {
    +                                                   
metaWrapper.writeInt(snapshotVersions.size());
    +                                                   for (Integer i : 
snapshotVersions) {
    +                                                           
metaWrapper.writeInt(i);
    +                                                   }
    +                                           }
    +                                           else {
    +                                                   metaWrapper.writeInt(0);
    +                                           }
    +                                           int keyGroupCount = 
allKeyGroups.getNumberOfKeyGroups();
    +                                           
metaWrapper.writeInt(keyGroupCount);
    +                                           for (int keyGroupIdx : 
allKeyGroups) {
    +                                                   
out.startNewKeyGroup(keyGroupIdx);
    +                                                   
metaWrapper.writeInt(keyGroupIdx);
    +                                                   
InternalTimerServiceSerializationProxy serializationProxy =
    +                                                           new 
InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +                                                                   
currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +                                                   
serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +                                           }
    +                                           LOG.info("return Tuple4 and 
snapshotVersions:" + snapshotVersions.toString());
    +                                           return true;
    +                                   } catch (Exception exception) {
    +                                           LOG.error("Could not write 
timer service of " + getOperatorName() +
    +                                                   " to checkpoint state 
stream.", exception);
    +                                           return false;
    +                                   } finally {
    +                                           
timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +                                           
StateSnapshotContextSynchronousImpl snapshotContext = 
(StateSnapshotContextSynchronousImpl) context;
    +                                           try {
    +                                                   
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +                                           } catch (IOException e) {
    +                                                   
LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +                                                   return false;
    --- End diff --
    
    how about change return type from Boolean to Tuple which contains throwable 
when exception happens?


---

Reply via email to