zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#discussion_r325477987
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -184,10 +184,6 @@ public boolean processInput() throws Exception { } checkFinished(status, lastReadInputIndex); - if (status != InputStatus.MORE_AVAILABLE) { Review comment: Let me summary the changes a bit. There are mainly three changes: 1. Migrate `inputSelectionHandler.setUnavailableInput` to integrate with `selectNextReadingInputIndex#updateAvailability`. I think it has no behavior different with before. Just to put similar logics together for better tracing. 2. Return status instead of boolean for `StreamInputProcessor#processInput`. It is for replacing the `isFinished` method and keeping the consistent form with `emitNext` return type. It also has no behavior change, and only `StreamTask#processInput` adds two additional conditions check instead of previous `while` way. But we can also use `while` way or add the hot path to avoid performance concern. ``` if (status == InputStatus.MORE_AVAILABLE) { return; } ``` 3. The value returned by `StreamTwoInputProcessor#processInput` is different from before which might cause different behaviors. In the past it might return invalid status after current call, and then in the next call it might return -1 while `selectNextReadingInputIndex()` to trigger `isAvailable()` access. But now it would always return valid status by judging the next selected index with its status. So if it is not available for next selected input, it would trigger `isAvailable()` access in current call directly, no need to trigger it in next call. From this point, my previous thought was that it might save some unnecessary cost from mailbox schedule aspect. Anyway, it is hard to verify the above points because of unstable benchmark. So we can only analyze the possible impacts in theory. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services