zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] 
Refactor StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#discussion_r325477987
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 ##########
 @@ -184,10 +184,6 @@ public boolean processInput() throws Exception {
                }
                checkFinished(status, lastReadInputIndex);
 
-               if (status != InputStatus.MORE_AVAILABLE) {
 
 Review comment:
   Let me summary the changes a bit. There are mainly three changes:
   
   1. Migrate `inputSelectionHandler.setUnavailableInput` to integrate with 
`selectNextReadingInputIndex#updateAvailability`. I think it has no behavior 
different with before. Just to put similar logics together for better tracing.
   
   2. Return status instead of boolean for `StreamInputProcessor#processInput`. 
It is for replacing the `isFinished` method and keeping the consistent form 
with `emitNext` return type. It also has no behavior change, and only 
`StreamTask#processInput` adds two additional conditions check instead of 
previous `while` way. But we can also use `while` way or add the hot path to 
avoid performance concern.
   ```
   if (status == InputStatus.MORE_AVAILABLE) {
        return;
   }
   ```
   
   3. The value returned by `StreamTwoInputProcessor#processInput` is different 
from before which might cause different behaviors. In the past it might return 
invalid status after current call, and then in the next call it might return -1 
while `selectNextReadingInputIndex()` to trigger `isAvailable()` access. But 
now it would always return valid status by judging the next selected index with 
its status. So if it is not available for next selected input, it would trigger 
`isAvailable()` access in current call directly, no need to trigger it in next 
call. From this point, my previous thought was that it might save some 
unnecessary cost from mailbox schedule aspect.
   
   Anyway, it is hard to verify the above points because of unstable benchmark. 
So we can only analyze the possible impacts in theory.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to