wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791934091



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       With out mockito it's not easy to test the internal input processors of 
the **StreamMultipleInputProcessor**. 
   I didn't came up with other solution to this.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new 
CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> 
anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 is try to do some short cut if any of 
the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new 
instance creation.
   The `if` condition is identical with the original.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new 
CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> 
anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 tries to do some short cut if any of the 
inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new 
instance creation.
   The `if` condition is identical with the original.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to