AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328136330
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ########## @@ -150,43 +139,26 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S this.inStreamElementSerializer = new StreamElementSerializer<>( getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader())); - // create the operators executor for the complete operations of the queue entries - this.executor = Executors.newSingleThreadExecutor(); - switch (outputMode) { case ORDERED: - queue = new OrderedStreamElementQueue( - capacity, - executor, - this); + queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: - queue = new UnorderedStreamElementQueue( - capacity, - executor, - this); + queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } + + this.timestampedCollector = new TimestampedCollector<>(output); } @Override public void open() throws Exception { super.open(); - // create the emitter - this.emitter = new Emitter<>(checkpointingLock, mailboxExecutor, output, queue, this); - - // start the emitter thread - this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')'); - emitterThread.setDaemon(true); - emitterThread.start(); - - // process stream elements from state, since the Emit thread will start as soon as all - // elements from previous state are in the StreamElementQueue, we have to make sure that the - // order to open all operators in the operator chain proceeds from the tail operator to the - // head operator. + // Process stream elements from state. We have to make sure that the order to open all operators in the + // operator chain proceeds from the tail operator to the head operator. Review comment: Yes, sounds like a good hotfix. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services