zhijiangW commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r340423539
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -281,10 +289,41 @@ protected void processInput(DefaultActionContext 
context) throws Exception {
                if (status == InputStatus.END_OF_INPUT) {
                        context.allActionsCompleted();
                }
-               else if (status == InputStatus.NOTHING_AVAILABLE) {
+
+               CompletableFuture<?> jointFuture = 
getInputOutputJointFuture(status);
+               if (jointFuture != null) {
                        SuspendedMailboxDefaultAction suspendedDefaultAction = 
context.suspendDefaultAction();
-                       
inputProcessor.getAvailableFuture().thenRun(suspendedDefaultAction::resume);
+                       jointFuture.thenRun(suspendedDefaultAction::resume);
+               }
+       }
+
+       /**
+        * @return a combination of input and output futures if at-least one 
future of them is not
+        * completed, otherwise return null if all input and outputs are 
available.
+        */
+       private CompletableFuture<?> getInputOutputJointFuture(InputStatus 
status) {
+               if (status == InputStatus.MORE_AVAILABLE && 
isOutputAvailable()) {
+                       return null;
+               }
+
+               int length = recordWriters.size();
+               for (int i = 0; i < length; i++) {
+                       inputOutputFutures[i] = 
recordWriters.get(i).getAvailableFuture();
+               }
+               inputOutputFutures[length] = 
inputProcessor.getAvailableFuture();
+               return CompletableFuture.allOf(inputOutputFutures);
+       }
+
+       /**
+        * @return true if all the record writers are available.
+        */
+       private boolean isOutputAvailable() {
+               for (RecordWriter recordWriter : recordWriters) {
+                       if (!recordWriter.isAvailable()) {
 
 Review comment:
   I ever considered dropping the condition of `isBlocking` before. But 
considering that `LocalBufferPool#isAvailable` is actually not used in input 
side, so I retain this condition to not impact the input side. 
   
   After thinking through it now, it is not very clean because during 
`LocalBufferPool#recycle` we could not distinguish whether it is for input or 
output usages. 
   
   We can remove this condition here and also adjust the interaction between 
`RemoteInputChannel` and `LocalBufferPool` to make use of `isAvailable` way 
instead of `BufferListener` interface. WDYT?
   
   Concerning of the synchronised `recordWriter.isAvailable()`, it is actually 
the same case as other previous usages. The unavailable state is always visible 
for mailbox/task thread because the `LocalBufferPool` might become unavailable 
only after task thread requesting. For the visibility from unavailable to 
available, it is caused by other threads, then it can be still visible for task 
thread via touching volatile variable in `CompleteFuture.complete` as we 
confirmed before.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to