zhijiangW commented on a change in pull request #11515: [FLINK-16744][task] implement channel state persistence for unaligned checkpoints URL: https://github.com/apache/flink/pull/11515#discussion_r405256556
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +interface ChannelStateWriteRequest { + long getCheckpointId(); + + void cancel(Throwable cause); + + static CheckpointInProgressRequest completeInput(long checkpointId) { + return new CheckpointInProgressRequest("completeInput", checkpointId, ChannelStateCheckpointWriter::completeInput, false); + } + + static CheckpointInProgressRequest completeOutput(long checkpointId) { + return new CheckpointInProgressRequest("completeOutput", checkpointId, ChannelStateCheckpointWriter::completeOutput, false); + } + + static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, Buffer... flinkBuffers) { + return new CheckpointInProgressRequest("writeInput", checkpointId, writer -> writer.writeInput(info, flinkBuffers), recycle(flinkBuffers), false); + } + + static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer... flinkBuffers) { + return new CheckpointInProgressRequest("writeOutput", checkpointId, writer -> writer.writeOutput(info, flinkBuffers), recycle(flinkBuffers), false); + } + + static ChannelStateWriteRequest start(long checkpointId, ChannelStateWriteResult targetResult, CheckpointStorageLocationReference locationReference) { + return new CheckpointStartRequest(checkpointId, targetResult, locationReference); + } + + static ChannelStateWriteRequest abort(long checkpointId, Throwable cause) { + return new CheckpointInProgressRequest("abort", checkpointId, writer -> writer.fail(cause), true); + } + + static Consumer<Throwable> recycle(Buffer[] flinkBuffers) { + return unused -> { + for (Buffer b : flinkBuffers) { + b.recycleBuffer(); + } + }; + } +} + +final class CheckpointStartRequest implements ChannelStateWriteRequest { + private final ChannelStateWriteResult targetResult; + private final CheckpointStorageLocationReference locationReference; + private final long checkpointId; + + CheckpointStartRequest(long checkpointId, ChannelStateWriteResult targetResult, CheckpointStorageLocationReference locationReference) { + this.checkpointId = checkpointId; + this.targetResult = targetResult; + this.locationReference = locationReference; + } + + @Override + public long getCheckpointId() { + return checkpointId; + } + + ChannelStateWriteResult getTargetResult() { + return targetResult; + } + + public CheckpointStorageLocationReference getLocationReference() { + return locationReference; + } + + @Override + public void cancel(Throwable cause) { + targetResult.fail(cause); + } + + @Override + public String toString() { + return "start " + checkpointId; + } +} + +final class CheckpointInProgressRequest implements ChannelStateWriteRequest { + private final ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action; + private final Consumer<Throwable> discardAction; + private final long checkpointId; + private final String name; + private final boolean ignoreMissingWriter; + private final AtomicReference<CheckpointInProgressRequestState> state = new AtomicReference<>(CheckpointInProgressRequestState.NEW); + + CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, boolean ignoreMissingWriter) { + this(name, checkpointId, action, unused -> { + }, ignoreMissingWriter); + } + + CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, Consumer<Throwable> discardAction, boolean ignoreMissingWriter) { + this.checkpointId = checkpointId; + this.action = action; + this.discardAction = discardAction; + this.name = name; + this.ignoreMissingWriter = ignoreMissingWriter; + } + + @Override + public long getCheckpointId() { + return checkpointId; + } + + @Override + public void cancel(Throwable cause) { + if (state.compareAndSet(CheckpointInProgressRequestState.NEW, CheckpointInProgressRequestState.CANCELLED)) { Review comment: I have three concerns with the `cancel`: - In `CheckpointStartRequest#cancel`, the `targetResult.fail(cause)` would be executed. For the case of `CheckpointInProgressRequest`, do we also need to complete the future with failure? - `#cancel` can only be invoked before `#execute`? If there are any exceptions while running `#execute`, do we also need to cancel? If so, the cancel can not be executed because the state is already `EXECUTING` after `#execute` - I guess the introduction of `state` was mainly for voiding cancelling multiple times, I do not think `#execute` can be called more than once in practice. If so, maybe it is not necessary to bring in so many state values, only need one `isCancelled` state. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services