zhijiangW commented on a change in pull request #11515: [FLINK-16744][task] implement channel state persistence for unaligned checkpoints URL: https://github.com/apache/flink/pull/11515#discussion_r402417315
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java ########## @@ -46,60 +78,64 @@ /** * Initiate write of channel state for the given checkpoint id. */ - void start(long checkpointId); + void start(long checkpointId, CheckpointOptions checkpointOptions); /** * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}. - * Must be called after {@link #start(long)} and before {@link #finish(long)}. + * Must be called after {@link #start} (long)} and before {@link #finishInput(long)}. + * Buffers are recycled after they are written. * @param startSeqNum sequence number of the 1st passed buffer. * It is intended to use for incremental snapshots. * If no data is passed it is ignored. - * @param data zero or more buffers ordered by their sequence numbers + * @param data zero or more <b>data</b> buffers ordered by their sequence numbers + * @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer() isn't a buffer} * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN */ - void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data); + void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException; /** * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. - * Must be called after {@link #start(long)} and before {@link #finish(long)}. + * Must be called after {@link #start} and before {@link #finishOutput(long)}. + * Buffers are recycled after they are written. * @param startSeqNum sequence number of the 1st passed buffer. * It is intended to use for incremental snapshots. * If no data is passed it is ignored. - * @param data zero or more buffers ordered by their sequence numbers + * @param data zero or more <b>data</b> buffers ordered by their sequence numbers + * @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer() isn't a buffer} * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN */ - void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data); + void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException; /** * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long)} and all of the input data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained - * using {@link #getWriteCompletionFuture} + * using {@link #getWriteResult} */ void finishInput(long checkpointId); /** * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long)} and all of the output data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained - * using {@link #getWriteCompletionFuture} + * using {@link #getWriteResult} */ void finishOutput(long checkpointId); /** - * Must be called after {@link #start(long)}. + * Must be called after {@link #start}. */ - Future<Collection<StateObject>> getWriteCompletionFuture(long checkpointId); + ChannelStateWriteResult getWriteResult(long checkpointId); @Override - void close() throws Exception; + void close(); Review comment: remove this method directly? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services