AHeise commented on a change in pull request #13741: URL: https://github.com/apache/flink/pull/13741#discussion_r511925770
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ########## @@ -35,7 +35,6 @@ private final AlignedController alignedController; private final UnalignedController unalignedController; private CheckpointBarrierBehaviourController activeController; - private long lastSeenBarrierId; Review comment: Typo is commit msg. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ########## @@ -35,7 +35,6 @@ private final AlignedController alignedController; private final UnalignedController unalignedController; private CheckpointBarrierBehaviourController activeController; Review comment: Can you fix the double-spacing here as well? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java ########## @@ -47,29 +50,56 @@ private final boolean isUnalignedCheckpoint; + private final long alignmentTimeout; Review comment: Could you motivate (in commit msg) why this is a property of the checkpoint vs. a config of the task? I don't mind this solution but I want to understand the reason. Intuitively, I'd put it into the task config, which would ultimately allow a fine-grain checkpoint configuration for each operator. Do you plan to adjust the `alignmentTimeout` when the barrier travels downstream? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java ########## @@ -143,4 +143,20 @@ TextElement.code(CheckpointingMode.EXACTLY_ONCE.toString()), TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key())) .build()); + + public static final ConfigOption<Duration> ALIGNMENT_TIMEOUT = + ConfigOptions.key("execution.checkpointing.alignment-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription(Description.builder() + .text("Only relevant if %s is enabled.", TextElement.code(ENABLE_UNALIGNED.key())) + .linebreak() + .linebreak() + .text("If timeout has value equal to 0, checkpoints will always start unaligned.") Review comment: Quite technical compared to the other descriptions (=too precise). > "If timeout is (set to) 0, checkpoints will always start unaligned." ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java ########## @@ -47,29 +50,56 @@ private final boolean isUnalignedCheckpoint; + private final long alignmentTimeout; + + public static CheckpointOptions create( + CheckpointType checkpointType, + CheckpointStorageLocationReference locationReference, + boolean isExactlyOnceMode, + boolean unalignedCheckpointsEnabled, + long alignmentTimeout) { + boolean canBeUnaligned = checkpointType == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled; + return new CheckpointOptions( + checkpointType, + locationReference, + isExactlyOnceMode, + canBeUnaligned && alignmentTimeout == 0, + canBeUnaligned ? alignmentTimeout : NO_ALIGNMENT_TIME_OUT); + } + @VisibleForTesting public CheckpointOptions( CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation) { - this(checkpointType, targetLocation, true, false); + this(checkpointType, targetLocation, true, false, NO_ALIGNMENT_TIME_OUT); } public CheckpointOptions( CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation, boolean isExactlyOnceMode, - boolean isUnalignedCheckpoint) { + boolean isUnalignedCheckpoint, + long alignmentTimeout) { this.checkpointType = checkNotNull(checkpointType); this.targetLocation = checkNotNull(targetLocation); this.isExactlyOnceMode = isExactlyOnceMode; this.isUnalignedCheckpoint = isUnalignedCheckpoint; + this.alignmentTimeout = alignmentTimeout; } public boolean needsAlignment() { return isExactlyOnceMode() && (getCheckpointType().isSavepoint() || !isUnalignedCheckpoint()); } + public long getAlignmentTimeout() { + return alignmentTimeout; + } + + public boolean isTimeoutable() { + return alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT; Review comment: Have `NO_ALIGNMENT_TIME_OUT = -1`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java ########## @@ -143,4 +143,20 @@ TextElement.code(CheckpointingMode.EXACTLY_ONCE.toString()), TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key())) .build()); + + public static final ConfigOption<Duration> ALIGNMENT_TIMEOUT = + ConfigOptions.key("execution.checkpointing.alignment-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription(Description.builder() + .text("Only relevant if %s is enabled.", TextElement.code(ENABLE_UNALIGNED.key())) + .linebreak() + .linebreak() + .text("If timeout has value equal to 0, checkpoints will always start unaligned.") + .linebreak() + .linebreak() + .text("If time has value greater then 0, checkpoints will start aligned. " + Review comment: > "If timeout is (set to) non-zero/positive value, checkpoints will..." ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -773,11 +776,12 @@ private void snapshotTaskState( Execution[] executions, boolean advanceToEndOfTime) { - final CheckpointOptions checkpointOptions = new CheckpointOptions( - props.getCheckpointType(), - checkpointStorageLocation.getLocationReference(), - isExactlyOnceMode, - props.getCheckpointType() == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled); + final CheckpointOptions checkpointOptions = CheckpointOptions.create( + props.getCheckpointType(), Review comment: nit: double indent. Also in various other places in this commit. Ignore if intended. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -437,19 +440,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx wasEmpty = receivedBuffers.isEmpty(); - if (buffer.getDataType().hasPriority()) { - receivedBuffers.addPriorityElement(new SequenceBuffer(buffer, sequenceNumber)); - if (channelStatePersister.checkForBarrier(buffer)) { - // checkpoint was not yet started by task thread, - // so remember the numbers of buffers to spill for the time when it will be started - numBuffersOvertaken = receivedBuffers.getNumUnprioritizedElements(); - } - firstPriorityEvent = receivedBuffers.getNumPriorityElements() == 1; + SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber); + DataType dataType = buffer.getDataType(); + if (dataType.hasPriority()) { + checkPriorityXorAnnouncement(buffer); + firstPriorityEvent = addPriorityBuffer(sequenceBuffer); } else { - receivedBuffers.add(new SequenceBuffer(buffer, sequenceNumber)); + receivedBuffers.add(sequenceBuffer); channelStatePersister.maybePersist(buffer); - } + if (dataType.requiresAnnouncement()) { + checkPriorityXorAnnouncement(buffer); + firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); + } Review comment: ``` if (dataType.hasPriority() || dataType.requiresAnnouncement()) { firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } if (!dataType.hasPriority()) { receivedBuffers.add(sequenceBuffer); channelStatePersister.maybePersist(buffer); } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierAnnouncement.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.RuntimeEvent; + +import java.io.IOException; +import java.util.Objects; + +/** + * {@link CheckpointBarrierAnnouncement} is announcing presence or receiving of a {@link CheckpointBarrier}. + * That {@link #announcedBarrier} is identified by it's sequence number. + */ +public class CheckpointBarrierAnnouncement extends RuntimeEvent { Review comment: I was hoping that we could make the announcement mechanism a bit independent of checkpoints. I don't see how we can generalize DataType, but I could imagine having a generic `AnnouncementEvent`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -471,6 +475,40 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx } } + private void checkPriorityXorAnnouncement(Buffer buffer) { Review comment: Instead of checking it during deserialization on all buffers, why not simply check it in `DataType` enum once? In particular, I don't see the need to ever announce priority events. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java ########## @@ -147,6 +148,13 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn } } + @Override + public void processBarrierAnnouncement( + CheckpointBarrierAnnouncement barrierAnnouncement, + InputChannelInfo channelInfo) throws IOException { + // Ignore for now. Review comment: Add TODO instead? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java ########## @@ -292,21 +313,37 @@ public boolean isEvent() { return isEvent; } + public boolean isBlockingUpstream() { Review comment: Why did you move this method? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java ########## @@ -247,41 +247,62 @@ /** * {@link #NONE} indicates that there is no buffer. */ - NONE(false, false, false, false), + NONE(false, false, false, false, false), /** * {@link #DATA_BUFFER} indicates that this buffer represents a non-event data buffer. */ - DATA_BUFFER(true, false, false, false), + DATA_BUFFER(true, false, false, false, false), /** * {@link #EVENT_BUFFER} indicates that this buffer represents serialized data of an event. * Note that this type can be further divided into more fine-grained event types * like {@link #ALIGNED_CHECKPOINT_BARRIER} and etc. */ - EVENT_BUFFER(false, true, false, false), + EVENT_BUFFER(false, true, false, false, false), /** * Same as EVENT_BUFFER, but the event has been prioritized (e.g. it skipped buffers). */ - PRIORITIZED_EVENT_BUFFER(false, true, false, true), + PRIORITIZED_EVENT_BUFFER(false, true, false, true, false), /** * {@link #ALIGNED_CHECKPOINT_BARRIER} indicates that this buffer represents a * serialized checkpoint barrier of aligned exactly-once checkpoint mode. */ - ALIGNED_CHECKPOINT_BARRIER(false, true, true, false); + ALIGNED_CHECKPOINT_BARRIER(false, true, true, false, false), + + /** + * {@link #TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER} indicates that this buffer represents a + * serialized checkpoint barrier of aligned exactly-once checkpoint mode, that can be time-out'ed + * to an unaligned checkpoint barrier. + */ + TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER(false, true, true, false, true); Review comment: As a set before, I'm not a huge fan of this very specific `DataType`s. (Why don't we have an `DataType` just for `EndOfPartitionEvent`s as well?) I also think that `ALIGNED_CHECKPOINT_BARRIER` is on a different level than all other types. One option that I could see is to just have `ANNOUNCED_EVENT_BUFFER(false, true, true, false, true)`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org