AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328132487
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ########## @@ -209,41 +176,31 @@ else if (element.isLatencyMarker()) { } @Override - public void processElement(StreamRecord<IN> element) throws Exception { - final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); + public void processElement(final StreamRecord<IN> element) throws Exception { + // add element first to the queue + final ResultFuture<OUT> entry = addToWorkQueue(element); + + final ResultHandler resultHandler = new ResultHandler(element, entry); + // register a timeout for the entry if timeout is configured if (timeout > 0L) { - // register a timeout for this AsyncStreamRecordBufferEntry - long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - - final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( - timeoutTimestamp, - new ProcessingTimeCallback() { - @Override - public void onProcessingTime(long timestamp) throws Exception { - userFunction.timeout(element.getValue(), streamRecordBufferEntry); - } - }); - - // Cancel the timer once we've completed the stream record buffer entry. This will remove - // the register trigger task - streamRecordBufferEntry.onComplete( - (StreamElementQueueEntry<Collection<OUT>> value) -> { - timerFuture.cancel(true); - }, - executor); - } + final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - addAsyncBufferEntry(streamRecordBufferEntry); + final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer( + timeoutTimestamp, + timestamp -> userFunction.timeout(element.getValue(), resultHandler)); - userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); + resultHandler.setTimeoutTimer(timeoutTimer); + } + + userFunction.asyncInvoke(element.getValue(), resultHandler); } @Override public void processWatermark(Watermark mark) throws Exception { - WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark); + addToWorkQueue(mark); - addAsyncBufferEntry(watermarkBufferEntry); + outputCompletedElements(); Review comment: It's unfortunately necessary and not an improvement in the closest sense. If we don't "pump" elements here, then an empty stream consisting just of watermarks, will never emit anything. The only other way to emit elements is upon completion, which is imho the best place for normal records, but watermarks never take this code path. We could ofc trigger some kind of artificial completion on them, but that's more complicated imho. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services