AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r326870334
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ########## @@ -293,140 +246,120 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** + * Add the given stream element to the operator's stream element queue. This operation blocks until the element + * has been added. + * + * <p>Between two insertion attempts, this method yields the execution to the mailbox, such that events as well + * as asynchronous results can be processed. + * + * @param streamElement to add to the operator's queue + * @throws InterruptedException if the current thread has been interrupted while yielding to mailbox + * @return a handle that allows to set the result of the async computation for the given element. + */ + private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional<ResultFuture<OUT>> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } - if (exception != null) { - throw exception; + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); + + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** - * Close the operator's resources. They include the emitter thread and the executor to run - * the queue's complete operation. + * Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * - * @param waitForShutdown is true if the method should wait for the resources to be freed; - * otherwise false. - * @throws InterruptedException if current thread has been interrupted + * <p>This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result + * of an async function call.</p> */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); - emitterThread.interrupt(); - - executor.shutdown(); - - if (waitForShutdown) { - try { - if (!executor.awaitTermination(365L, TimeUnit.DAYS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - - /* - * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so - * that the emitter thread can complete/react to the interrupt signal. - */ - if (Thread.holdsLock(checkpointingLock)) { - while (emitterThread.isAlive()) { - checkpointingLock.wait(100L); + private void outputCompletedElements() { + Collection<StreamElement> completed = queue.popCompleted(); Review comment: Not sure what the benefit of the old code was, but I have improved the code even further. There is no need for the check anymore. So please check it one more time. `TimestampedCollector` seems to be too limited at the moment (should actually delegate all `Output` methods). I could extend it in that way and use it (basically it's missing `emitWatermark` for our use case). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services