pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794368740



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side 
StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
+            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > In your fix, the AtomicReference can not prevent that either.
   
   It doesn't need to. Take a closer look on the order of execution in my 
version methods `maybeReset()` and `registerFuture()` and think about what will 
happen if one of the input's availability future will complete concurrently. 
   1. If input becomes available before we call `maybeReset()` it's future will 
be hooked up to the new combined future.
   2. If input becomes available just after `anyAvailable.get().isDone()` 
check, but before `anyAvailable.set(new CompletableFuture<>())`, the 
combined/returned future will be available anyway. We will clean up everything 
in the next `getAvailableFuture()` call.
   3. If input becomes available after `maybeReset()`, it will just complete 
the combined/returned regardless if we register it or not.
   4. `registerFuture()` calls will take care of setting up all of the inputs' 
futures.
   
   So I don't see any race condition in my version. Yours works quite similar 
after all, but your version's lack of `AtomicReference` creates an opportunity, 
for example in point 3., that if input becomes available, it will attempt to 
complete wrong, obsolete, old, already completed future.
   
   >  The isDone() check is still seperate with the reset.
   
   This `isDone()` check is irrelevant, as it can be executed on the 
wrong/old/obsolete instance of the `CompletableFuture` in your case.
   
   > Both ours fix is working.
   
   Maybe the deadlock is very rare, maybe you are not testing the right 
scenario (you would need to have a scenario where at first both inputs are 
flickering available, and then at the same time, while 2nd input is 
unavailable, 1st input becomes permanently unavailable, while second input 
becomes available (and 2nd input's availability is swallowed).
   
   Further more, it might be working perfectly fine now, but if JDK  optimises 
the code a bit differently, or JDK is upgraded it will stop working.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to