AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint URL: https://github.com/apache/flink/pull/11507#discussion_r406327420
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier + * and keeping track of the number of received barriers and consumed barriers. + */ +@Internal +@NotThreadSafe +public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierUnaligner.class); + + private final String taskName; + + /** + * Tag the state of which input channel has read the barrier. If one channel has read the barrier by task, + * the respective in-flight input buffers should be empty when triggering unaligned checkpoint. + */ + private final boolean[] barrierConsumedChannels; + + private int numBarrierConsumed; + + /** + * Contains the offsets of the channel indices for each gate when flattening the channels of all gates. + * + * <p>For example, consider 3 gates with 4 channels, {@code gateChannelOffsets = [0, 4, 8]}. + */ + private final int[] gateChannelOffsets; + + private final InputChannelInfo[] channelInfos; + + /** + * The checkpoint id to guarantee that we would trigger only one checkpoint when reading the same barrier from + * different channels. + * + * <p>Note: this checkpoint is valid in respect to <b>consumed</b> barriers in contrast to + * {@link ThreadSafeUnaligner#currentReceivedCheckpointId}. + */ + private long currentConsumedCheckpointId = -1L; + + /** Encapsulates state that is shared between netty threads and task thread. */ + private final ThreadSafeUnaligner threadSafeUnaligner; + + CheckpointBarrierUnaligner( + int[] numberOfInputChannelsPerGate, + ChannelStateWriter channelStateWriter, + String taskName, + AbstractInvokable toNotifyOnCheckpoint) { + super(toNotifyOnCheckpoint); + + this.taskName = taskName; + + final int numGates = numberOfInputChannelsPerGate.length; + + gateChannelOffsets = new int[numGates]; + for (int index = 1; index < numGates; index++) { + gateChannelOffsets[index] = gateChannelOffsets[index - 1] + numberOfInputChannelsPerGate[index - 1]; + } + + final int totalNumChannels = gateChannelOffsets[numGates - 1] + numberOfInputChannelsPerGate[numGates - 1]; + barrierConsumedChannels = new boolean[totalNumChannels]; + Arrays.fill(barrierConsumedChannels, true); + + channelInfos = IntStream.range(0, numGates) + .mapToObj(gateIndex -> IntStream.range(0, numberOfInputChannelsPerGate[gateIndex]) + .mapToObj(channelIndex -> new InputChannelInfo(gateIndex, channelIndex))) + .flatMap(Function.identity()) + .toArray(InputChannelInfo[]::new); + + threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this); + } + + @Override + public void releaseBlocksAndResetBarriers() { + // make sure no additional data is persisted + Arrays.fill(barrierConsumedChannels, true); + // the next barrier that comes must assume it is the first + numBarrierConsumed = 0; + threadSafeUnaligner.releaseReceivedBarriers(); + } + + /** + * For unaligned checkpoint, it never blocks processing from the task aspect. + * + * <p>For PoC, we do not consider the possibility that the unaligned checkpoint would + * not perform due to the max configured unaligned checkpoint size. + */ + @Override + public boolean isBlocked(int channelIndex) { + return false; + } + + /** + * We still need to trigger checkpoint while reading the first barrier from one channel, because this might happen + * earlier than the previous async trigger via mailbox by netty thread. And the {@link AbstractInvokable} has the + * deduplication logic to guarantee trigger checkpoint only once finally. + * + * <p>Note this is also suitable for the trigger case of local input channel. + */ + @Override + public boolean processBarrier( + CheckpointBarrier receivedBarrier, + int channelIndex, + long bufferedBytes) { + long barrierId = receivedBarrier.getId(); + if (currentConsumedCheckpointId < barrierId) { + currentConsumedCheckpointId = barrierId; + numBarrierConsumed = 0; + Arrays.fill(barrierConsumedChannels, false); + } + if (currentConsumedCheckpointId == barrierId) { + barrierConsumedChannels[channelIndex] = true; + numBarrierConsumed++; + } + // processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty + // buffer queues + // to avoid replicating any logic, we simply call notifyBarrierReceived here as well + threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]); + return false; + } + + @Override + public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + final long barrierId = cancelBarrier.getCheckpointId(); + + if (numBarrierConsumed > 0) { + // this is only true if some alignment is in progress and nothing was canceled + + if (barrierId == currentConsumedCheckpointId) { + // cancel this alignment + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId); + } + + releaseBlocksAndResetBarriers(); + notifyAbortOnCancellationBarrier(barrierId); + } + else if (barrierId > currentConsumedCheckpointId) { + // we canceled the next which also cancels the current + LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + taskName, + barrierId, + currentConsumedCheckpointId); + + // this stops the current alignment + releaseBlocksAndResetBarriers(); Review comment: I added the checkpoint id as a parameter to make sure they talk about the same thing. The current method name is `resetReceivedBarriers`. I think it's now quite precise but feel free to add if it's still confusing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services