rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543676375



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber) 
throws IOException {
                                "Attempted to convertToPriorityEvent an event 
[%s] that has already been prioritized [%s]",
                                toPrioritize,
                                numPriorityElementsBeforeRemoval);
+                       // set the priority flag (checked on poll)
+                       // don't convert the barrier itself (barrier controller 
might not have been switched yet)
+                       AbstractEvent e = 
EventSerializer.fromBuffer(toPrioritize.buffer, 
this.getClass().getClassLoader());
+                       toPrioritize.buffer.setReaderIndex(0);
+                       toPrioritize = new 
SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
                        firstPriorityEvent = addPriorityBuffer(toPrioritize);   
// note that only position of the element is changed
                                                                                
                                                        // converting the event 
itself would require switching the controller sooner
                }
                if (firstPriorityEvent) {
-                       notifyPriorityEvent(sequenceNumber);
+                       notifyPriorityEventForce(); // use force here because 
the barrier SQN might be seen by gate during the announcement

Review comment:
       Rephrased as:
   ```
   // forcibly notify about the priority event
   // instead of passing barrier SQN to be checked
   // because this SQN might have be seen by the input gate during the 
announcement
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to