dawidwys commented on a change in pull request #13529: URL: https://github.com/apache/flink/pull/13529#discussion_r501526829
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -23,16 +23,12 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; Review comment: I had another look at the issue and now that I understand a bit better the assumptions on `getAvailability` I think the current code is ok % maybe we should change `StreamTwoInputProcessor#L206` to: ``` if (status == InputStatus.MORE_AVAILABLE || (status != InputStatus.END_OF_INPUT && (input.isApproximatelyAvailable() || input.isAvailable()))) { ``` My problem was that I did not use the `AvailabilityProvider.AVAILABLE` at all. My code was something like: ``` class MyInput { CompletableFuture<?> availabilityMarker = ... void executeSomeAsyncOperation() { availabilityMarker = new CompletableFuture(); executeAsync(this::asyncOperation); } void asyncOperation() { .... availabilityMarker.complete(null); } CompletableFuture<?> getAvailability() { return availabilityProvider; } } ``` This won't work because the default implementation of `isApproximatelyAvailable()` is never true. I would have to change: ``` void asyncOperation() { .... CompletableFuture<?> tmp = availabilityMarker; availabilityMarker = AvailabilityProvider.AVAILABLE; tmp.complete(null); } ``` This is an implicit, not obvious contract that I was unaware of. The change I suggested for `StreamTwoInputProcessor#L206` would probably make it work for my old version, but I think it is not strictly necessary. On the other hand some additional documentation or a walkthrough example could be helpful here, especially as we are exposing this feature in user facing APIs. I will remove this commit. Sorry for the confusion. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortedInputITCase.java ########## @@ -52,13 +56,16 @@ import static org.junit.Assert.assertThat; /** - * Longer running IT tests for {@link SortingDataInputTest}. For quicker smoke tests see {@link SortingDataInputTest}. + * Longer running IT tests for {@link SortingDataInput} and {@link MultiInputSortingDataInputs}. + * + * @see SortingDataInputTest + * @see MultiInputSortingDataInputsTest */ -public class SortingDataInputITCase { +public class SortedInputITCase { Review comment: Good idea. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -124,6 +120,7 @@ private int selectFirstReadingInputIndex() throws IOException { } private void checkFinished(InputStatus status) throws Exception { + updateAvailability(); Review comment: The commit is unnecessary. I did not fully understand the availability handling. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -171,6 +168,10 @@ private int selectNextReadingInputIndex() throws IOException { updateAvailability(); checkInputSelectionAgainstIsFinished(); + if (inputSelectionHandler.isInputUnavailable(0) && inputSelectionHandler.isInputUnavailable(1)) { + fullCheckAndSetAvailable(); + } Review comment: The commit is unnecessary. I did not fully understand the availability handling. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java ########## @@ -70,6 +70,10 @@ boolean areAllInputsSelected() { return inputSelection.areAllInputsSelected(); } + boolean isInputUnavailable(int inputIndex) { Review comment: I removed the commit, so I am not adding any of the two methods ;) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java ########## @@ -62,6 +62,16 @@ public StreamOneInputProcessor( return input.getAvailableFuture(); } + @Override + public boolean isAvailable() { + return input.isAvailable(); + } + + @Override + public boolean isApproximatelyAvailable() { + return input.isApproximatelyAvailable(); + } + Review comment: It is the same if the input does not override default implementations. Mine does. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java ########## @@ -45,16 +45,16 @@ private final StreamTaskInput<IN> input; private final DataOutput<IN> output; - private final OperatorChain<?, ?> operatorChain; + private final BoundedMultiInput endOfInputAware; public StreamOneInputProcessor( StreamTaskInput<IN> input, DataOutput<IN> output, - OperatorChain<?, ?> operatorChain) { + BoundedMultiInput endOfInputAware) { Review comment: `StreamMultipleInputProcessor` needs the `OperatorChain` for extracting chained sources. I updated it in the `StreamTwoInputProcessor` though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org