zhijiangW commented on a change in pull request #10029: [FLINK-14553][runtime] Respect non-blocking output in StreamTask#processInput URL: https://github.com/apache/flink/pull/10029#discussion_r340423539
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext context) throws Exception { if (status == InputStatus.END_OF_INPUT) { context.allActionsCompleted(); } - else if (status == InputStatus.NOTHING_AVAILABLE) { + + CompletableFuture<?> jointFuture = getInputOutputJointFuture(status); + if (jointFuture != null) { SuspendedMailboxDefaultAction suspendedDefaultAction = context.suspendDefaultAction(); - inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume); + jointFuture.thenRun(suspendedDefaultAction::resume); + } + } + + /** + * @return a combination of input and output futures if at-least one future of them is not + * completed, otherwise return null if all input and outputs are available. + */ + private CompletableFuture<?> getInputOutputJointFuture(InputStatus status) { + if (status == InputStatus.MORE_AVAILABLE && isOutputAvailable()) { + return null; + } + + int length = recordWriters.size(); + for (int i = 0; i < length; i++) { + inputOutputFutures[i] = recordWriters.get(i).getAvailableFuture(); + } + inputOutputFutures[length] = inputProcessor.getAvailableFuture(); + return CompletableFuture.allOf(inputOutputFutures); + } + + /** + * @return true if all the record writers are available. + */ + private boolean isOutputAvailable() { + for (RecordWriter recordWriter : recordWriters) { + if (!recordWriter.isAvailable()) { Review comment: I ever considered dropping the condition of `isBlocking` before. But considering that `LocalBufferPool#isAvailable` is actually not used in input side, so I retain this condition to not impact the input side. After thinking through it now, it is not very clean because during `LocalBufferPool#recycle` we could not distinguish whether it is for input or output usages. We can remove this condition here and also adjust the interaction between `RemoteInputChannel` and `LocalBufferPool` to make use of `isAvailable` way instead of `BufferListener` interface. WDYT? Concerning of the synchronised `recordWriter.isAvailable()`, it is actually the same case as other previous usages. The unavailable state is always visible for mailbox/task thread because the `LocalBufferPool` might become unavailable only after task thread requesting. For the visibility from unavailable to available, it is caused by other threads, then it can be still visible for task thread via touching volatile variable in `CompleteFuture.complete` as we confirmed before. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services