wpc009 commented on a change in pull request #18475: URL: https://github.com/apache/flink/pull/18475#discussion_r791934091
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -101,10 +101,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; Review comment: With out mockito it's not easy to test the internal input processors of the **StreamMultipleInputProcessor**. I didn't came up with other solution to this. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -55,17 +77,24 @@ public StreamMultipleInputProcessor( || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) - && inputSelectionHandler.isInputSelected(i)) { - assertNoException( - inputProcessors[i] - .getAvailableFuture() - .thenRun(() -> anyInputAvailable.complete(null))); + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].getAvailableFuture() == AVAILABLE) { + return AVAILABLE; Review comment: the for loop through L80 to L86 is try to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation. The `if` condition is identical with the original. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -55,17 +77,24 @@ public StreamMultipleInputProcessor( || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) - && inputSelectionHandler.isInputSelected(i)) { - assertNoException( - inputProcessors[i] - .getAvailableFuture() - .thenRun(() -> anyInputAvailable.complete(null))); + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].getAvailableFuture() == AVAILABLE) { + return AVAILABLE; Review comment: the for loop through L80 to L86 tries to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation. The `if` condition is identical with the original. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org