[ https://issues.apache.org/jira/browse/FLINK-13767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhijiang updated FLINK-13767: ----------------------------- Description: StreamInputProcessor#processInput could return InputStatus instead of current boolean value to keep consistent with PushingAsyncDataInput#emitNext. For the implementation of StreamTwoInputProcessor#processInput, we could maintain and judge the two input status together with the next selected input index to determine the final precise status. To do so we could avoid invalid processInput call except for the first call. In addition, AvailabilityProvider#isFinished has the duplicated semantic with InputStatus#END_OF_INPUT for PushingAsyncDataInput, and it is only meaningful for PullingAsyncDataInput now. So we migrate the #isFinished method from AvailabilityProvider to PullingAsyncDataInput. was: AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We already introduced InputStatus for PushBasedAsyncDataInput#emitNext, and then InputStatus#END_OF_INPUT has the same semantic with AvailabilityListener#isFinished. But for the case of AsyncDataInput which is mainly used by InputGate layer, the isFinished() method is still needed at the moment. So we migrate this method from AvailabilityListener to AsyncDataInput, and refactor the StreamInputProcessor implementations by using InputStatus to judge the finished state. > Refactor StreamInputProcessor#processInput based on InputStatus > --------------------------------------------------------------- > > Key: FLINK-13767 > URL: https://issues.apache.org/jira/browse/FLINK-13767 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network, Runtime / Task > Reporter: zhijiang > Assignee: zhijiang > Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > StreamInputProcessor#processInput could return InputStatus instead of current > boolean value to keep consistent with PushingAsyncDataInput#emitNext. > For the implementation of StreamTwoInputProcessor#processInput, we could > maintain and judge the two input status together with the next selected input > index to determine the final precise status. To do so we could avoid invalid > processInput call except for the first call. > In addition, AvailabilityProvider#isFinished has the duplicated semantic > with InputStatus#END_OF_INPUT for PushingAsyncDataInput, and it is only > meaningful for PullingAsyncDataInput now. So we migrate the #isFinished > method from AvailabilityProvider to PullingAsyncDataInput. -- This message was sent by Atlassian Jira (v8.3.2#803003)