pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side 
StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
+            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain 
field reference.
   
   I'm afraid you are wrong in this regard. All atomic classes are wrappers 
around `volatile` field + indeed some extra atomic CAS operations and such 
[they provide happens before 
relationship:](https://www.tutorialspoint.com/java_concurrency/concurrency_atomicreference.htm)
   
   > That is, a set has a happens-before relationship with any subsequent get 
on the same variable. The atomic compareAndSet method also has these memory 
consistency features.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to