wpc009 commented on a change in pull request #18475: URL: https://github.com/apache/flink/pull/18475#discussion_r792294286
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -55,17 +77,24 @@ public StreamMultipleInputProcessor( || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) - && inputSelectionHandler.isInputSelected(i)) { - assertNoException( - inputProcessors[i] - .getAvailableFuture() - .thenRun(() -> anyInputAvailable.complete(null))); + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].getAvailableFuture() == AVAILABLE) { + return AVAILABLE; Review comment: Maybe this shortcut is useless. I will remove it. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -101,10 +101,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; Review comment: Since this issue is related to overwhelming objects creation and holding a reference to completed future objects. So, testing this behavior is hard to achieve without mockito. Any good suggestion for this? I'm ok to remove mockito if we can verify no extra future objects created on idle input. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() { } } } + + /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */ + @VisibleForTesting + public static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] cachedAvailableFutures; + private final Consumer[] onCompletion; + private CompletableFuture<?> availableFuture; + + public CompletableFuture<?> getAvailableFuture() { + return availableFuture; + } + + public static MultipleInputAvailabilityHelper newInstance(int inputSize) { + MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); + return obj; + } + + private MultipleInputAvailabilityHelper(int inputSize) { + this.cachedAvailableFutures = new CompletableFuture[inputSize]; + this.onCompletion = new Consumer[inputSize]; + } + + @VisibleForTesting + public void init() { + for (int i = 0; i < cachedAvailableFutures.length; i++) { + final int inputIdx = i; + onCompletion[i] = (Void) -> notifyCompletion(inputIdx); + } + } + + public boolean isInvalid() { + return availableFuture == null || availableFuture.isDone(); + } + + public void resetToUnavailable() { + availableFuture = new CompletableFuture<>(); + } + + private void notifyCompletion(int idx) { + if (availableFuture != null && !availableFuture.isDone()) { + availableFuture.complete(null); + } + cachedAvailableFutures[idx] = AVAILABLE; + } Review comment: I confirmed that the benchmark of MultipleInputStream got blocked during test. This may be related to the competing between `notifyCompletion` calls and `resetToUnavailable` calls. I'm working on a solution. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -47,6 +49,26 @@ public StreamMultipleInputProcessor( StreamOneInputProcessor<?>[] inputProcessors) { this.inputSelectionHandler = inputSelectionHandler; this.inputProcessors = inputProcessors; + this.availabilityHelper = + MultipleInputAvailabilityHelper.newInstance(inputProcessors.length); + this.availabilityHelper.init(); + } + + @Override + public boolean isAvailable() { + if (inputSelectionHandler.isAnyInputAvailable() + || inputSelectionHandler.areAllInputsFinished()) { + return true; + } else { + boolean isAvailable = false; + for (int i = 0; i < inputProcessors.length; i++) { + isAvailable = + !inputSelectionHandler.isInputFinished(i) + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].isAvailable(); + } + return isAvailable; Review comment: I see you point. The `availableInputsMask` inside `inputSelectionHandler` and the `future` both indicate that wheher the corresponding input is available or not. I will make some modification. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java ########## @@ -101,10 +101,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; Review comment: I see. You mean just remove this unit test? Since, the internal private implementation details of `StreamMultipleInputProcessor` can not be tested. Can I understand it that way? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -55,17 +77,24 @@ public StreamMultipleInputProcessor( || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) - && inputSelectionHandler.isInputSelected(i)) { - assertNoException( - inputProcessors[i] - .getAvailableFuture() - .thenRun(() -> anyInputAvailable.complete(null))); + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].getAvailableFuture() == AVAILABLE) { + return AVAILABLE; Review comment: Yeah, you were right. The shortcut for-loop is redundant with `inputSelectionHandler.isAnyInputAvailable()`. I will make a change. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -47,6 +49,26 @@ public StreamMultipleInputProcessor( StreamOneInputProcessor<?>[] inputProcessors) { this.inputSelectionHandler = inputSelectionHandler; this.inputProcessors = inputProcessors; + this.availabilityHelper = + MultipleInputAvailabilityHelper.newInstance(inputProcessors.length); + this.availabilityHelper.init(); + } + + @Override + public boolean isAvailable() { + if (inputSelectionHandler.isAnyInputAvailable() + || inputSelectionHandler.areAllInputsFinished()) { + return true; + } else { + boolean isAvailable = false; + for (int i = 0; i < inputProcessors.length; i++) { + isAvailable = + !inputSelectionHandler.isInputFinished(i) + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].isAvailable(); + } + return isAvailable; Review comment: The original code does not override the `isAvailable` method from `AvailabilityProvider` which will call the `getAvailableFuture` first and determing the availability status according to the status of future. the `getAvailableFuture` creates a new instance each time some one calls it. So, duplicate code to avoid calling `getAvailableFuture` inorder to reducing memory footprints. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() { } } } + + /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */ + @VisibleForTesting + public static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] cachedAvailableFutures; + private final Consumer[] onCompletion; + private CompletableFuture<?> availableFuture; + + public CompletableFuture<?> getAvailableFuture() { + return availableFuture; + } + + public static MultipleInputAvailabilityHelper newInstance(int inputSize) { + MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); + return obj; + } + + private MultipleInputAvailabilityHelper(int inputSize) { + this.cachedAvailableFutures = new CompletableFuture[inputSize]; + this.onCompletion = new Consumer[inputSize]; + } + + @VisibleForTesting + public void init() { + for (int i = 0; i < cachedAvailableFutures.length; i++) { + final int inputIdx = i; + onCompletion[i] = (Void) -> notifyCompletion(inputIdx); + } + } + + public boolean isInvalid() { + return availableFuture == null || availableFuture.isDone(); + } + + public void resetToUnavailable() { + availableFuture = new CompletableFuture<>(); + } + + private void notifyCompletion(int idx) { + if (availableFuture != null && !availableFuture.isDone()) { + availableFuture.complete(null); + } + cachedAvailableFutures[idx] = AVAILABLE; + } Review comment: I think the competing will cause duplicate future complete calls. Do not know how it will get stucked. I will check these benchmarks, and try to find what cause these deadlocks. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -47,6 +49,26 @@ public StreamMultipleInputProcessor( StreamOneInputProcessor<?>[] inputProcessors) { this.inputSelectionHandler = inputSelectionHandler; this.inputProcessors = inputProcessors; + this.availabilityHelper = + MultipleInputAvailabilityHelper.newInstance(inputProcessors.length); + this.availabilityHelper.init(); + } + + @Override + public boolean isAvailable() { + if (inputSelectionHandler.isAnyInputAvailable() + || inputSelectionHandler.areAllInputsFinished()) { + return true; + } else { + boolean isAvailable = false; + for (int i = 0; i < inputProcessors.length; i++) { + isAvailable = + !inputSelectionHandler.isInputFinished(i) + && inputSelectionHandler.isInputSelected(i) + && inputProcessors[i].isAvailable(); + } + return isAvailable; Review comment: This implementation may not be optimal. I may not fully understand the difference between the `inputSelectionHandler.isAnyInputAvailable` and the `for` loop bellow. This paragraph is borrowed from the original code, replacing the `thenRun` part, with aggregating the result of `inputProcessors[i].isAvailable()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org