rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543676375
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber) throws IOException { "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", toPrioritize, numPriorityElementsBeforeRemoval); + // set the priority flag (checked on poll) + // don't convert the barrier itself (barrier controller might not have been switched yet) + AbstractEvent e = EventSerializer.fromBuffer(toPrioritize.buffer, this.getClass().getClassLoader()); + toPrioritize.buffer.setReaderIndex(0); + toPrioritize = new SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber); firstPriorityEvent = addPriorityBuffer(toPrioritize); // note that only position of the element is changed // converting the event itself would require switching the controller sooner } if (firstPriorityEvent) { - notifyPriorityEvent(sequenceNumber); + notifyPriorityEventForce(); // use force here because the barrier SQN might be seen by gate during the announcement Review comment: Rephrased as: ``` // forcibly notify about the priority event // instead of passing barrier SQN to be checked // because this SQN might have be seen by the input gate during the announcement ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org