AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328089758
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##########
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue<OUT> {
 
        /**
-        * Put the given element in the queue if capacity is left. If not, then 
block until this is
-        * the case.
+        * Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+        * the queue is full.
         *
-        * @param streamElementQueueEntry to be put into the queue
-        * @param <T> Type of the entries future value
-        * @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-        *      insert the given element
-        */
-       <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws 
InterruptedException;
-
-       /**
-        * Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-        * left and fails if the queue is full.
+        * <p>This method returns a handle to the inserted element that allows 
to set the result of the computation.</p>
         *
-        * @param streamElementQueueEntry to be inserted
-        * @param <T> Type of the entries future value
-        * @return True if the entry could be inserted; otherwise false
-        * @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-        *      insert the given element
+        * @param streamElement the element to be inserted.
+        * @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
         */
-       <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) 
throws InterruptedException;
+       Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);
 
        /**
-        * Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-        * operation is a blocking operation and only returns once a completed 
async result has been
-        * found.
+        * Emits one completed element from the head of this queue into the 
given output.
         *
-        * @return Completed {@link AsyncResult}
-        * @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-        *      completed async result.
+        * <p>Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+        * any critical section).</p>
+        *
+        * @param output the output into which to emit
         */
-       AsyncResult peekBlockingly() throws InterruptedException;
+       void emitCompleted(TimestampedCollector<OUT> output);
 
        /**
-        * Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-        * blocking and only returns once a completed async result has been 
found.
+        * Checks if there is at least one completed element to be emitted.
 
 Review comment:
   You are right. I added `hasCompleted()` mostly to avoid acquiring checkpoint 
lock unnecessarily, but the interface should be more abstract from the specific 
use case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to