wpc009 commented on a change in pull request #18475: URL: https://github.com/apache/flink/pull/18475#discussion_r794451818
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() { } } } + + /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */ + @VisibleForTesting + public static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] cachedAvailableFutures; + private final Consumer[] onCompletion; + private CompletableFuture<?> availableFuture; + + public CompletableFuture<?> getAvailableFuture() { + return availableFuture; + } + + public static MultipleInputAvailabilityHelper newInstance(int inputSize) { + MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); + return obj; + } + + private MultipleInputAvailabilityHelper(int inputSize) { + this.cachedAvailableFutures = new CompletableFuture[inputSize]; + this.onCompletion = new Consumer[inputSize]; + } + + @VisibleForTesting + public void init() { + for (int i = 0; i < cachedAvailableFutures.length; i++) { + final int inputIdx = i; + onCompletion[i] = (Void) -> notifyCompletion(inputIdx); + } + } + + public boolean isInvalid() { + return availableFuture == null || availableFuture.isDone(); + } + + public void resetToUnavailable() { + availableFuture = new CompletableFuture<>(); + } + + private void notifyCompletion(int idx) { + if (availableFuture != null && !availableFuture.isDone()) { + availableFuture.complete(null); + } + cachedAvailableFutures[idx] = AVAILABLE; + } Review comment: In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future. Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call (let's see, after the `isDone()` check, and before the `set` operation) and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it. It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object. Without the `CAS` operation, the only difference here is the `volatile` field. But, since we can not preventing the race condition. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here. But, anyway, there is no harm to make the `availableFuture` `volatile`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org