rpuch commented on code in PR #5672:
URL: https://github.com/apache/ignite-3/pull/5672#discussion_r2052205116


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -206,40 +214,59 @@ public CompletableFuture<Void> notifyWatches(long 
newRevision, List<Entry> updat
         return inBusyLockAsync(() -> notifyWatchesInternal(newRevision, 
updatedEntries, time));
     }
 
-    private CompletableFuture<Void> notifyWatchesInternal(long newRevision, 
List<Entry> updatedEntries, HybridTimestamp time) {
-        assert time != null;
+    /**
+     * Composes passed action with {@link #notificationFuture} and handles any 
exceptions that might have occurred.
+     *
+     * @param asyncAction Action to compose.
+     * @return Updated value of {@link #notificationFuture}.
+     */
+    @VisibleForTesting
+    CompletableFuture<Void> enqueue(Supplier<CompletableFuture<Void>> 
asyncAction) {
+        while (true) {
+            CompletableFuture<Void> chainingFuture = new CompletableFuture<>();
+
+            CompletableFuture<Void> newNotificationFuture = chainingFuture
+                    .thenComposeAsync(v -> inBusyLockAsync(asyncAction), 
watchExecutor)
+                    .whenComplete((unused, e) -> {
+                        if (e != null) {
+                            
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
+                        }
+                    });
+
+            CompletableFuture<Void> oldNotificationFuture = notificationFuture;
+
+            if (NOTIFICATION_FUTURE_UPDATER.compareAndSet(this, 
oldNotificationFuture, newNotificationFuture)) {
+                
oldNotificationFuture.whenComplete(copyStateTo(chainingFuture));
 
-        CompletableFuture<Void> newFuture = notificationFuture
-                .thenComposeAsync(v -> inBusyLockAsync(() -> {
-                    List<Entry> filteredUpdatedEntries = 
updatedEntries.isEmpty() ? emptyList() : updatedEntries.stream()
-                            
.filter(WatchProcessor::isNotIdempotentCacheCommand)
-                            .collect(toList());
+                return newNotificationFuture;
+            }
+        }
+    }
 
-                    List<WatchAndEvents> watchAndEvents = 
collectWatchesAndEvents(filteredUpdatedEntries, newRevision);
+    private CompletableFuture<Void> notifyWatchesInternal(long newRevision, 
List<Entry> updatedEntries, HybridTimestamp time) {
+        assert time != null;
 
-                    long startTimeNanos = System.nanoTime();
+        return enqueue(() -> {
+            List<Entry> filteredUpdatedEntries = updatedEntries.isEmpty() ? 
emptyList() : updatedEntries.stream()
+                    .filter(WatchProcessor::isNotIdempotentCacheCommand)
+                    .collect(toList());
 
-                    CompletableFuture<Void> notifyWatchesFuture = 
performWatchesNotifications(watchAndEvents, newRevision, time);
+            List<WatchAndEvents> watchAndEvents = 
collectWatchesAndEvents(filteredUpdatedEntries, newRevision);
 
-                    // Revision update is triggered strictly after all watch 
listeners have been notified.
-                    CompletableFuture<Void> notifyUpdateRevisionFuture = 
notifyUpdateRevisionListeners(newRevision);
+            long startTimeNanos = System.nanoTime();
 
-                    CompletableFuture<Void> notificationFuture = 
allOf(notifyWatchesFuture, notifyUpdateRevisionFuture)
-                            .thenRunAsync(() -> inBusyLock(() -> 
invokeOnRevisionCallback(newRevision, time)), watchExecutor);
+            CompletableFuture<Void> notifyWatchesFuture = 
performWatchesNotifications(watchAndEvents, newRevision, time);
 
-                    notificationFuture.whenComplete((unused, e) -> 
maybeLogLongProcessing(filteredUpdatedEntries, startTimeNanos));
+            // Revision update is triggered strictly after all watch listeners 
have been notified.
+            CompletableFuture<Void> notifyUpdateRevisionFuture = 
notifyUpdateRevisionListeners(newRevision);
 
-                    return notificationFuture;
-                }), watchExecutor)
-                .whenComplete((unused, e) -> {
-                    if (e != null) {
-                        
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
-                    }
-                });
+            CompletableFuture<Void> notificationFuture = 
allOf(notifyWatchesFuture, notifyUpdateRevisionFuture)

Review Comment:
   Would it make sense to give this variable another name (like 
`newNotificationFuture`) to make it more difficult to confuse it with 
`this.notificationFuture`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to