pnowojski commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r487038327
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean inser private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { assert Thread.holdsLock(buffers); - if (insertAsHead) { - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); + if (!insertAsHead) { + buffers.add(bufferConsumer); + return; + } + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { + final int pos = buffers.getNumPriorityElements(); + buffers.addPriorityElement(bufferConsumer); + + boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); + if (unalignedCheckpoint) { + final Iterator<BufferConsumer> iterator = buffers.iterator(); + Iterators.advance(iterator, pos + 1); + while (iterator.hasNext()) { + BufferConsumer buffer = iterator.next(); + + if (buffer.isBuffer()) { + try (BufferConsumer bc = buffer.copy()) { inflightBufferSnapshot.add(bc.build()); } } } + } + } - buffers.addFirst(bufferConsumer); - } else { - buffers.add(bufferConsumer); + private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) { + boolean unalignedCheckpoint; + try (BufferConsumer bc = bufferConsumer.copy()) { + Buffer buffer = bc.build(); + try { + final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + unalignedCheckpoint = event instanceof CheckpointBarrier; + } catch (IOException e) { + throw new IllegalStateException("Should always be able to deserialize in-memory event", e); + } finally { + buffer.recycleBuffer(); + } } + return unalignedCheckpoint; Review comment: this method is only `isCheckpointBarrier` and it seems to not care if it's aligned or not, right? Besides, do we really need to deserialise the event? Previously we were snapshotting in-flight data every time we were inserting buffer as a head. I think it was just as not elegant, but simpler. I guess this is currently a dead code, but would change if we ever want to have priority cancelation markers? If that's a sole motivation, I would revisit this problem in the future. Who knows if we will need this with checkpoint abort RPC. And if we will do, there is also another option: Inserting priority UC barrier, could go through a separate method , that would return overtaken in-flight data: ``` Collection<Buffer> insertAsHeadAndGetInFlightData(checkpointBarrier) ``` which would also eliminate the currently existing assumption/hack that `requestInflightBufferSnapshot` has to be always called immediately after inserting as a head. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean inser private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { assert Thread.holdsLock(buffers); - if (insertAsHead) { - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); + if (!insertAsHead) { + buffers.add(bufferConsumer); + return; + } + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { + final int pos = buffers.getNumPriorityElements(); Review comment: nit: `pos` -> `numberOfPriorityEvents`? It would make the following line: ``` Iterators.advance(iterator, pos + 1); ``` more readable. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -124,23 +127,22 @@ public boolean isAvailable() { } /** - * Check whether this reader is available or not (internal use, in sync with - * {@link #isAvailable()}, but slightly faster). + * Returns the {@link org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer in line. * - * <p>Returns true only if the next buffer is an event or the reader has both available + * <p>Returns the next data type only if the next buffer is an event or the reader has both available * credits and buffers. * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer + * @return the next data type if the next buffer can be pulled immediately or null */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { // BEWARE: this must be in sync with #isAvailable()! Review comment: It's hard to say now, what does it mean it should be kept in sync, as they return very different things. On the other hand, this comment is valuable, so would be nice to keep it? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ########## @@ -109,14 +113,14 @@ private void setupInputChannels() { bufferBuilder.finish(); // Call getCurrentBuffer to ensure size is set - return Optional.of(new BufferAndAvailability(bufferConsumer.build(), moreAvailable, 0)); + return Optional.of(new BufferAndAvailability(bufferConsumer.build(), nextType, 0)); } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); if (event instanceof EndOfPartitionEvent) { inputChannels[channelIndex].setReleased(); } - return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event, false), moreAvailable, 0)); + return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event, false), nextType, 0)); Review comment: nit: whitespace? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java ########## @@ -34,11 +30,8 @@ void notifyDataAvailable(); /** - * Allows the listener to react to a priority event before it is added to the outgoing buffer queue. - * - * @return true if the event has been fully processed and should not be added to the buffer queue. + * Called when the first priority event is added to the head of the buffer queue. */ - default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { - return false; + default void notifyPriorityEvent() { Review comment: could you elaborate a bit more in the commit message, what has been simplified, why and what are the benefits? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -779,34 +820,56 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { })); } - private void queueChannel(InputChannel channel) { - int availableChannels; + private void queueChannel(InputChannel channel, boolean priority) { + CompletableFuture<?> toNotifyPriority = null; Review comment: optional nit: extract `toNotify` and `toNotifyPriority` pair to some simple inner class ` ``` public static class DataNotification() { @Nullable CompletableFuture<?> toNotifyPriority = null; @Nullable CompletableFuture<?> toNotify = null; // two setters setXYZ(...); void complete() { if (toNotifyPriority != null) { toNotifyPriority.complete(null); } if (toNotify != null) { toNotify.complete(null); } } } ``` and re-use in `UnionInputGate` as well? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean inser private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { assert Thread.holdsLock(buffers); - if (insertAsHead) { - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); + if (!insertAsHead) { + buffers.add(bufferConsumer); + return; + } + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. Review comment: nit: maybe it's worth keeping this comment? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.Objects; + +/** + * A deque-like data structure that supports prioritization of elements, such they will be polled before any + * non-priority elements. + * + * <p>{@implNote The current implementation deliberately does not implement the respective interface to minimize the maintenance + * effort. Furthermore, it's optimized for handling non-priority elements, such that all operations for adding priority + * elements are much slower than the non-priority counter-parts.} + * + * <p>Note that all element tests are performed by identity. + * + * @param <T> the element type. + */ +@Internal +public final class PrioritizedDeque<T> implements Iterable<T> { + private final Deque<T> deque = new ArrayDeque<>(); + private int numPriorityElements; + + /** + * Adds a priority element to this deque, such that it will be polled after all existing priority elements but + * before any non-priority element. + * + * @param element the element to add + */ + public void addPriorityElement(T element) { + // priority elements are rather rare and short-lived, so most of there are none + if (numPriorityElements == 0) { + deque.addFirst(element); + } else if (numPriorityElements == deque.size()) { + // no non-priority elements + deque.add(element); + } else { + // remove all priority elements + final ArrayDeque<T> priorPriority = new ArrayDeque<>(numPriorityElements); + for (int index = 0; index < numPriorityElements; index++) { + priorPriority.addFirst(deque.poll()); + } + deque.addFirst(element); + // readd them before the newly added element + for (final T priorityEvent : priorPriority) { + deque.addFirst(priorityEvent); + } + } + numPriorityElements++; + } + + /** + * Adds a non-priority element to this deque, which will be polled last. + * + * @param element the element to add + */ + public void add(T element) { + deque.add(element); + } + + /** + * Convenience method for adding an element with optional priority and prior removal. + * + * @param element the element to add + * @param priority flag indicating if it's a priority or non-priority element + * @param alreadyContained flag that hints that the element is already in this deque, potentially as non-priority element. + */ + public void add(T element, boolean priority, boolean alreadyContained) { Review comment: is this method being used? I think at least not in this commit ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -779,34 +820,56 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { })); } - private void queueChannel(InputChannel channel) { - int availableChannels; + private void queueChannel(InputChannel channel, boolean priority) { + CompletableFuture<?> toNotifyPriority = null; CompletableFuture<?> toNotify = null; synchronized (inputChannelsWithData) { - if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) { + // do not enqueue if the channel is currently polled because priority event could have been polled already + // let #waitAndGetNextData re-enqueue the channel correctly instead + if (priority && selectedChannel == channel) { return; } - availableChannels = inputChannelsWithData.size(); - inputChannelsWithData.add(channel); - enqueuedInputChannelsWithData.set(channel.getChannelIndex()); + if (!queueChannelUnsafe(channel, priority)) { + return; + } - if (availableChannels == 0) { + if (priority && inputChannelsWithData.getNumPriorityElements() == 1) { + toNotifyPriority = priorityAvailabilityHelper.getUnavailableToResetAvailable(); + } + if (inputChannelsWithData.size() == 1) { inputChannelsWithData.notifyAll(); toNotify = availabilityHelper.getUnavailableToResetAvailable(); } } + if (toNotifyPriority != null) { + toNotifyPriority.complete(null); + } if (toNotify != null) { toNotify.complete(null); } } + private boolean queueChannelUnsafe(InputChannel channel, boolean priority) { Review comment: add java doc what does it return? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java ########## @@ -261,8 +260,7 @@ void registerSourceReader(ReaderInfo readerInfo) { * @param subtaskId the subtask id of the source reader. */ void unregisterSourceReader(int subtaskId) { - Preconditions.checkNotNull(registeredReaders.remove(subtaskId), String.format( - "Failed to unregister source reader of id %s because it is not registered.", subtaskId)); + registeredReaders.remove(subtaskId); Review comment: Is this a fix for an existing bug on the master branch? Or are you changing the behaviour/contract of this `unregisterSourceReader` call in some later commit? Would it be worthwhile/easy adding a test? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -124,23 +127,22 @@ public boolean isAvailable() { } /** - * Check whether this reader is available or not (internal use, in sync with - * {@link #isAvailable()}, but slightly faster). + * Returns the {@link org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer in line. * - * <p>Returns true only if the next buffer is an event or the reader has both available + * <p>Returns the next data type only if the next buffer is an event or the reader has both available * credits and buffers. * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer + * @return the next data type if the next buffer can be pulled immediately or null */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { Review comment: nit: you've broken a comment reference (L125): ``` #isAvailable(BufferAndBacklog) ``` replace it with javadoc's `{@link}`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -290,6 +294,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { /** * Parses the buffer as an event and returns the {@link CheckpointBarrier} if the event is indeed a barrier or * returns null in all other cases. + * @return Review comment: good to know that it returns something :) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -124,23 +127,22 @@ public boolean isAvailable() { } /** - * Check whether this reader is available or not (internal use, in sync with - * {@link #isAvailable()}, but slightly faster). + * Returns the {@link org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer in line. * - * <p>Returns true only if the next buffer is an event or the reader has both available + * <p>Returns the next data type only if the next buffer is an event or the reader has both available * credits and buffers. * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer + * @return the next data type if the next buffer can be pulled immediately or null Review comment: `or {@link DataType.NONE}` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ########## @@ -89,6 +92,14 @@ */ private final int[] inputGateChannelIndexOffsets; + /** + * The channel from which is currently polled, which allows interleaving of + * {@link #queueInputGate(IndexedInputGate, boolean)} and {@link #pollNext()} (FLINK-12510 (Deadlock when reading from InputGates)). + */ + @GuardedBy("inputGatesWithData") + @Nullable + private IndexedInputGate currentInputGate; + Review comment: I haven't fully understood this part yet. Maybe let's sync later off-line? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ########## @@ -186,34 +203,47 @@ public boolean isFinished() { private Optional<InputWithData<IndexedInputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException { while (true) { - Optional<IndexedInputGate> inputGate = getInputGate(blocking); - if (!inputGate.isPresent()) { + Optional<IndexedInputGate> inputGateOpt = getInputGate(blocking); + if (!inputGateOpt.isPresent()) { return Optional.empty(); } + final IndexedInputGate inputGate = inputGateOpt.get(); // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. // Do not poll the gate under inputGatesWithData lock, since this can trigger notifications // that could deadlock because of wrong locks taking order. - Optional<BufferOrEvent> bufferOrEvent = inputGate.get().pollNext(); + Optional<BufferOrEvent> nextOpt = inputGate.pollNext(); + if (!nextOpt.isPresent()) { + inputGate.getAvailableFuture().thenRun(() -> queueInputGate(inputGate, false)); + continue; + } + final BufferOrEvent bufferOrEvent = nextOpt.get(); Review comment: nit: extract everything below to: ``` return Optional.of(processBufferOrEvent(nextOpt.get())); ``` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org