AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328132487
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -209,41 +176,31 @@ else if (element.isLatencyMarker()) {
        }
 
        @Override
-       public void processElement(StreamRecord<IN> element) throws Exception {
-               final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new 
StreamRecordQueueEntry<>(element);
+       public void processElement(final StreamRecord<IN> element) throws 
Exception {
+               // add element first to the queue
+               final ResultFuture<OUT> entry = addToWorkQueue(element);
+
+               final ResultHandler resultHandler = new ResultHandler(element, 
entry);
 
+               // register a timeout for the entry if timeout is configured
                if (timeout > 0L) {
-                       // register a timeout for this 
AsyncStreamRecordBufferEntry
-                       long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
-
-                       final ScheduledFuture<?> timerFuture = 
getProcessingTimeService().registerTimer(
-                               timeoutTimestamp,
-                               new ProcessingTimeCallback() {
-                                       @Override
-                                       public void onProcessingTime(long 
timestamp) throws Exception {
-                                               
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
-                                       }
-                               });
-
-                       // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
-                       // the register trigger task
-                       streamRecordBufferEntry.onComplete(
-                               (StreamElementQueueEntry<Collection<OUT>> 
value) -> {
-                                       timerFuture.cancel(true);
-                               },
-                               executor);
-               }
+                       final long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-               addAsyncBufferEntry(streamRecordBufferEntry);
+                       final ScheduledFuture<?> timeoutTimer = 
getProcessingTimeService().registerTimer(
+                                       timeoutTimestamp,
+                                       timestamp -> 
userFunction.timeout(element.getValue(), resultHandler));
 
-               userFunction.asyncInvoke(element.getValue(), 
streamRecordBufferEntry);
+                       resultHandler.setTimeoutTimer(timeoutTimer);
+               }
+
+               userFunction.asyncInvoke(element.getValue(), resultHandler);
        }
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
-               WatermarkQueueEntry watermarkBufferEntry = new 
WatermarkQueueEntry(mark);
+               addToWorkQueue(mark);
 
-               addAsyncBufferEntry(watermarkBufferEntry);
+               outputCompletedElements();
 
 Review comment:
   It's unfortunately necessary and not an improvement in the closest sense.
   If we don't "pump" elements here, then an empty stream consisting just of 
watermarks, will never emit anything. The only other way to emit elements is 
upon completion, which is imho the best place for normal records, but 
watermarks never take this code path. We could ofc trigger some kind of 
artificial completion on them, but that's more complicated imho.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to