Myasuka commented on a change in pull request #11491:
[FLINK-16513][checkpointing] Unaligned checkpoints: checkpoint metadata
URL: https://github.com/apache/flink/pull/11491#discussion_r397583510
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
##########
@@ -105,41 +126,51 @@ public void setOperatorStateRawFuture(
this.operatorStateRawFuture = operatorStateRawFuture;
}
- public void cancel() throws Exception {
- Exception exception = null;
+ @Nonnull
+ public RunnableFuture<SnapshotResult<InputChannelStateHandle>>
getInputChannelStateFuture() {
+ return inputChannelStateFuture;
+ }
- try {
-
StateUtil.discardStateFuture(getKeyedStateManagedFuture());
- } catch (Exception e) {
- exception = new Exception("Could not properly cancel
managed keyed state future.", e);
- }
+ public void setInputChannelStateFuture(@Nonnull
RunnableFuture<SnapshotResult<InputChannelStateHandle>>
inputChannelStateFuture) {
+ this.inputChannelStateFuture = inputChannelStateFuture;
+ }
- try {
-
StateUtil.discardStateFuture(getOperatorStateManagedFuture());
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(
- new Exception("Could not properly cancel
managed operator state future.", e),
- exception);
- }
+ @Nonnull
+ public RunnableFuture<SnapshotResult<ResultSubpartitionStateHandle>>
getResultSubpartitionStateFuture() {
+ return resultSubpartitionStateFuture;
+ }
- try {
- StateUtil.discardStateFuture(getKeyedStateRawFuture());
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(
- new Exception("Could not properly cancel raw
keyed state future.", e),
- exception);
- }
+ public void setResultSubpartitionStateFuture(@Nonnull
RunnableFuture<SnapshotResult<ResultSubpartitionStateHandle>>
resultSubpartitionStateFuture) {
+ this.resultSubpartitionStateFuture =
resultSubpartitionStateFuture;
+ }
- try {
-
StateUtil.discardStateFuture(getOperatorStateRawFuture());
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(
- new Exception("Could not properly cancel raw
operator state future.", e),
- exception);
+ public void cancel() throws Exception {
+ List<Tuple2<RunnableFuture<? extends StateObject>, String>>
pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(getKeyedStateManagedFuture(), "managed
keyed"));
+ pairs.add(new Tuple2<>(getKeyedStateRawFuture(), "managed
operator"));
+ pairs.add(new Tuple2<>(getOperatorStateManagedFuture(), "raw
keyed"));
+ pairs.add(new Tuple2<>(getOperatorStateRawFuture(), "raw
operator"));
+ pairs.add(new Tuple2<>(getInputChannelStateFuture(), "input
channel"));
+ pairs.add(new Tuple2<>(getResultSubpartitionStateFuture(),
"result subpartition"));
+ try (Closer closer = Closer.create()) {
+ for (Tuple2<RunnableFuture<? extends StateObject>,
String> pair : pairs) {
+ closer.register(() -> {
+ try {
+ discardStateFuture(pair.f0);
+ } catch (Exception e) {
+ throw new
RuntimeException(String.format("Could not properly cancel %s state future",
pair.f1), e);
+ }
+ });
+ }
}
+ }
- if (exception != null) {
- throw exception;
- }
+ public Future<?>[] getAllFutures() {
Review comment:
Why not return `inputChannelStateFuture` and `resultSubpartitionStateFuture`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services