wpc009 commented on a change in pull request #18475: URL: https://github.com/apache/flink/pull/18475#discussion_r794493752
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() { } } } + + /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */ + @VisibleForTesting + public static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] cachedAvailableFutures; + private final Consumer[] onCompletion; + private CompletableFuture<?> availableFuture; + + public CompletableFuture<?> getAvailableFuture() { + return availableFuture; + } + + public static MultipleInputAvailabilityHelper newInstance(int inputSize) { + MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); + return obj; + } + + private MultipleInputAvailabilityHelper(int inputSize) { + this.cachedAvailableFutures = new CompletableFuture[inputSize]; + this.onCompletion = new Consumer[inputSize]; + } + + @VisibleForTesting + public void init() { + for (int i = 0; i < cachedAvailableFutures.length; i++) { + final int inputIdx = i; + onCompletion[i] = (Void) -> notifyCompletion(inputIdx); + } + } + + public boolean isInvalid() { + return availableFuture == null || availableFuture.isDone(); + } + + public void resetToUnavailable() { + availableFuture = new CompletableFuture<>(); + } + + private void notifyCompletion(int idx) { + if (availableFuture != null && !availableFuture.isDone()) { + availableFuture.complete(null); + } + cachedAvailableFutures[idx] = AVAILABLE; + } Review comment: The only divergence here is the on the use of `AtomicReference`. My option is that `AtomicReference` is not crucial in this scenario. Yours seems to be that the `AtomicReference` is the missing piece of my fix which prevents getting a correct result. I think both of our fix are functioning. And I will make some last modification to the concern you have about the `volatile` feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org