wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side 
StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
+            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. 
Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar 
after all, but your version's lack of AtomicReference creates an opportunity, 
for example in point 3., that if input becomes available, it will attempt to 
complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the 
`maybeReset()` call (let's see, after the `isDone()` check, and before the 
`set` operation) and the completion callback will see the obsolete, old, 
already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. 
AtomicReference's `set` and `get` are plain method on plain object. Without the 
`CAS` operation, the only difference here is the `volatile` field. But, since 
we can not preventing the race condition. Future completion still has the 
chance to see old future object. So, `volatile` is the least thing to concern 
here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to