AHeise commented on a change in pull request #14052: URL: https://github.com/apache/flink/pull/14052#discussion_r523995285
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -481,7 +489,7 @@ private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) throws IOExcept if (channelStatePersister.checkForBarrier(sequenceBuffer.buffer)) { // checkpoint was not yet started by task thread, // so remember the numbers of buffers to spill for the time when it will be started - numBuffersOvertaken = receivedBuffers.getNumUnprioritizedElements(); + lastOvertakenSequenceNumber = sequenceBuffer.sequenceNumber; Review comment: You wrote in the PR description that this PR will also avoid `numBuffersOvertaken` being overwritten, but I don't see a specific change in that regard here. I was assuming that one of the earlier PRs already helped with that by fixing `checkForBarrier`. ########## File path: flink-streaming-java/src/test/resources/log4j2-test.properties ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = ERROR Review comment: There is some reason why it's turned off by default and it should stay this way. If you want to know the reason, I'd have to ask Chesnay or Robert though. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) { synchronized (receivedBuffers) { channelStatePersister.startPersisting( barrier.getId(), - getInflightBuffers(numBuffersOvertaken == ALL ? receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken)); + getInflightBuffers()); } } public void checkpointStopped(long checkpointId) { synchronized (receivedBuffers) { channelStatePersister.stopPersisting(checkpointId); - numBuffersOvertaken = ALL; + lastOvertakenSequenceNumber = null; + } + } + + @VisibleForTesting + List<Buffer> getInflightBuffers() { + synchronized (receivedBuffers) { + return getInflightBuffersUnsafe(); } } /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ - private List<Buffer> getInflightBuffers(int numBuffers) { + private List<Buffer> getInflightBuffersUnsafe() { assert Thread.holdsLock(receivedBuffers); - if (numBuffers == 0) { - return Collections.emptyList(); - } - - final List<Buffer> inflightBuffers = new ArrayList<>(numBuffers); + final List<Buffer> inflightBuffers = new ArrayList<>(); Iterator<SequenceBuffer> iterator = receivedBuffers.iterator(); // skip all priority events (only buffers are stored anyways) Iterators.advance(iterator, receivedBuffers.getNumPriorityElements()); - // spill number of overtaken buffers or all of them if barrier has not been seen yet - for (int pos = 0; pos < numBuffers; pos++) { - Buffer buffer = iterator.next().buffer; - if (buffer.isBuffer()) { - inflightBuffers.add(buffer.retainBuffer()); + while (iterator.hasNext()) { + SequenceBuffer sequenceBuffer = iterator.next(); + if (sequenceBuffer.buffer.isBuffer() && shouldBeSpilled(sequenceBuffer.sequenceNumber)) { Review comment: It should be possible to avoid iterating over all buffers by aborting on the first `!shouldBeSpilled`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -98,14 +100,11 @@ private final BufferManager bufferManager; - /** - * Indicates the last overtaken sequence number by the most recent {@link CheckpointBarrier} - * before task thread started checkpoint, or {@code null} if {@link CheckpointBarrier} hasn't - * arrived yet. - */ @GuardedBy("receivedBuffers") - @Nullable - private Integer lastOvertakenSequenceNumber = null; + private int lastBarrierSequenceNumber = NONE; Review comment: Ah that what I was missing in the first main commit. I propose to squash the 2 main commits or at least already use the final name/type in the first main commit, so that the two commits don't touch every piece of code twice. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) { synchronized (receivedBuffers) { channelStatePersister.startPersisting( barrier.getId(), - getInflightBuffers(numBuffersOvertaken == ALL ? receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken)); + getInflightBuffers()); } } public void checkpointStopped(long checkpointId) { synchronized (receivedBuffers) { channelStatePersister.stopPersisting(checkpointId); - numBuffersOvertaken = ALL; + lastOvertakenSequenceNumber = null; + } + } + + @VisibleForTesting + List<Buffer> getInflightBuffers() { + synchronized (receivedBuffers) { + return getInflightBuffersUnsafe(); } } /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ - private List<Buffer> getInflightBuffers(int numBuffers) { + private List<Buffer> getInflightBuffersUnsafe() { assert Thread.holdsLock(receivedBuffers); - if (numBuffers == 0) { - return Collections.emptyList(); - } - - final List<Buffer> inflightBuffers = new ArrayList<>(numBuffers); + final List<Buffer> inflightBuffers = new ArrayList<>(); Iterator<SequenceBuffer> iterator = receivedBuffers.iterator(); // skip all priority events (only buffers are stored anyways) Iterators.advance(iterator, receivedBuffers.getNumPriorityElements()); - // spill number of overtaken buffers or all of them if barrier has not been seen yet - for (int pos = 0; pos < numBuffers; pos++) { - Buffer buffer = iterator.next().buffer; - if (buffer.isBuffer()) { - inflightBuffers.add(buffer.retainBuffer()); + while (iterator.hasNext()) { + SequenceBuffer sequenceBuffer = iterator.next(); + if (sequenceBuffer.buffer.isBuffer() && shouldBeSpilled(sequenceBuffer.sequenceNumber)) { + inflightBuffers.add(sequenceBuffer.buffer.retainBuffer()); } } + lastOvertakenSequenceNumber = null; + return inflightBuffers; } + /** + * @return if given {@param sequenceNumber} should be spilled given {@link #lastOvertakenSequenceNumber}. + * We might not have yet received {@link CheckpointBarrier} and we might need to spill everything. + * If we have already received it, there is a bit nasty corner case of {@link SequenceBuffer#sequenceNumber} + * overflowing that needs to be handled as well. + */ + private boolean shouldBeSpilled(int sequenceNumber) { + if (lastOvertakenSequenceNumber == null) { + return true; + } + checkState( + receivedBuffers.size() < Integer.MAX_VALUE / 2, + "Too many buffers for sequenceNumber overflow detection code to work correctly"); + + boolean possibleOverflowAfterOvertaking = Integer.MAX_VALUE / 2 < lastOvertakenSequenceNumber; + boolean possibleOverflowBeforeOvertaking = lastOvertakenSequenceNumber < -Integer.MAX_VALUE / 2; + + if (possibleOverflowAfterOvertaking) { + return sequenceNumber < lastOvertakenSequenceNumber && sequenceNumber > 0; + } + else if (possibleOverflowBeforeOvertaking) { + return sequenceNumber < lastOvertakenSequenceNumber || sequenceNumber > 0; + } + else { + return sequenceNumber < lastOvertakenSequenceNumber; + } Review comment: Do we need to use similar code later for timeout at a different place? If so, I'd pull it in some Util (even flink-core?). Alternatively, we could also just switch to having long sequence numbers. I was originally writing this overflow check, but I'm now thinking that it might be easier to just add these 4 bytes to each buffer header. I guess the question is how often we actually reach Int.MAX in one execution attempt on one channel. With a 32kb buffer, it's ~70 Tb data on that channel, so rather unlikely in one attempt. On the other hand, if we ever persist the buffer id (for incremental state channel), we might reach it eventually. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ########## @@ -1125,70 +1127,137 @@ public void testUnblockReleasedChannel() throws Exception { @Test public void testPrioritySequenceNumbers() throws Exception { - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 4096); - SingleInputGate inputGate = new SingleInputGateBuilder() - .setChannelFactory(InputChannelBuilder::buildRemoteChannel) - .setBufferPoolFactory(networkBufferPool.createBufferPool(1, 4)) - .setSegmentProvider(networkBufferPool) - .build(); - final RemoteInputChannel channel = (RemoteInputChannel) inputGate.getChannel(0); - inputGate.setup(); - inputGate.requestPartitions(); + final RemoteInputChannel channel = buildInputGateAndGetChannel(); + sendBuffersAndBarrier(channel, 0); Review comment: I'm not sure if all of this particular refactoring makes the test more readable: How should I know which sequence number sequence to expect? The assertion at the end is easy to understand though. It would be much easier to understand with: ``` sendBuffer(0); sendBuffer(1); sendBarrier(2); sendBuffer(3); sendBuffer(4); ``` These building blocks should also be reusable for the other tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org