Myasuka commented on a change in pull request #11491: 
[FLINK-16513][checkpointing] Unaligned checkpoints: checkpoint metadata
URL: https://github.com/apache/flink/pull/11491#discussion_r397583510
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
 ##########
 @@ -105,41 +126,51 @@ public void setOperatorStateRawFuture(
                this.operatorStateRawFuture = operatorStateRawFuture;
        }
 
-       public void cancel() throws Exception {
-               Exception exception = null;
+       @Nonnull
+       public RunnableFuture<SnapshotResult<InputChannelStateHandle>> 
getInputChannelStateFuture() {
+               return inputChannelStateFuture;
+       }
 
-               try {
-                       
StateUtil.discardStateFuture(getKeyedStateManagedFuture());
-               } catch (Exception e) {
-                       exception = new Exception("Could not properly cancel 
managed keyed state future.", e);
-               }
+       public void setInputChannelStateFuture(@Nonnull 
RunnableFuture<SnapshotResult<InputChannelStateHandle>> 
inputChannelStateFuture) {
+               this.inputChannelStateFuture = inputChannelStateFuture;
+       }
 
-               try {
-                       
StateUtil.discardStateFuture(getOperatorStateManagedFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel 
managed operator state future.", e),
-                               exception);
-               }
+       @Nonnull
+       public RunnableFuture<SnapshotResult<ResultSubpartitionStateHandle>> 
getResultSubpartitionStateFuture() {
+               return resultSubpartitionStateFuture;
+       }
 
-               try {
-                       StateUtil.discardStateFuture(getKeyedStateRawFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel raw 
keyed state future.", e),
-                               exception);
-               }
+       public void setResultSubpartitionStateFuture(@Nonnull 
RunnableFuture<SnapshotResult<ResultSubpartitionStateHandle>> 
resultSubpartitionStateFuture) {
+               this.resultSubpartitionStateFuture = 
resultSubpartitionStateFuture;
+       }
 
-               try {
-                       
StateUtil.discardStateFuture(getOperatorStateRawFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel raw 
operator state future.", e),
-                               exception);
+       public void cancel() throws Exception {
+               List<Tuple2<RunnableFuture<? extends StateObject>, String>> 
pairs = new ArrayList<>();
+               pairs.add(new Tuple2<>(getKeyedStateManagedFuture(), "managed 
keyed"));
+               pairs.add(new Tuple2<>(getKeyedStateRawFuture(), "managed 
operator"));
+               pairs.add(new Tuple2<>(getOperatorStateManagedFuture(), "raw 
keyed"));
+               pairs.add(new Tuple2<>(getOperatorStateRawFuture(), "raw 
operator"));
+               pairs.add(new Tuple2<>(getInputChannelStateFuture(), "input 
channel"));
+               pairs.add(new Tuple2<>(getResultSubpartitionStateFuture(), 
"result subpartition"));
+               try (Closer closer = Closer.create()) {
+                       for (Tuple2<RunnableFuture<? extends StateObject>, 
String> pair : pairs) {
+                               closer.register(() -> {
+                                       try {
+                                               discardStateFuture(pair.f0);
+                                       } catch (Exception e) {
+                                               throw new 
RuntimeException(String.format("Could not properly cancel %s state future", 
pair.f1), e);
+                                       }
+                               });
+                       }
                }
+       }
 
-               if (exception != null) {
-                       throw exception;
-               }
+       public Future<?>[] getAllFutures() {
 
 Review comment:
   Why not return `inputChannelStateFuture` and `resultSubpartitionStateFuture`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to