Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171183475 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -553,6 +553,12 @@ public void requestPartitions() throws IOException, InterruptedException { channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) { + // Because of race condition between: + // 1. releasing inputChannelsWithData lock in this method and reaching this place + // 2. empty data notification that re-enqueues a channel + // we can end up with moreAvailable flag set to true, while we expect no more data. + checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent()); + moreAvailable = false; --- End diff -- While this certainly fixes the `checkState(!bufferOrEvent.moreAvailable());` in the `UnionInputGate`, it does not improve the detection of additional data after the `EndOfPartitionEvent` too much. How about also adding `checkState(!pollNextBufferOrEvent().isPresent());` here: ``` private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { if (hasReceivedAllEndOfPartitionEvents) { checkState(!pollNextBufferOrEvent().isPresent()); return Optional.empty(); } ``` In that case, if we ever try to get more data (due to a data notification) there should be no actual data left and only empty buffers.
---