ibessonov commented on code in PR #5672: URL: https://github.com/apache/ignite-3/pull/5672#discussion_r2052208690
########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java: ########## @@ -206,40 +214,59 @@ public CompletableFuture<Void> notifyWatches(long newRevision, List<Entry> updat return inBusyLockAsync(() -> notifyWatchesInternal(newRevision, updatedEntries, time)); } - private CompletableFuture<Void> notifyWatchesInternal(long newRevision, List<Entry> updatedEntries, HybridTimestamp time) { - assert time != null; + /** + * Composes passed action with {@link #notificationFuture} and handles any exceptions that might have occurred. + * + * @param asyncAction Action to compose. + * @return Updated value of {@link #notificationFuture}. + */ + @VisibleForTesting + CompletableFuture<Void> enqueue(Supplier<CompletableFuture<Void>> asyncAction) { + while (true) { + CompletableFuture<Void> chainingFuture = new CompletableFuture<>(); + + CompletableFuture<Void> newNotificationFuture = chainingFuture + .thenComposeAsync(v -> inBusyLockAsync(asyncAction), watchExecutor) + .whenComplete((unused, e) -> { + if (e != null) { + notifyFailureHandlerOnFirstFailureInNotificationChain(e); + } + }); + + CompletableFuture<Void> oldNotificationFuture = notificationFuture; + + if (NOTIFICATION_FUTURE_UPDATER.compareAndSet(this, oldNotificationFuture, newNotificationFuture)) { + oldNotificationFuture.whenComplete(copyStateTo(chainingFuture)); - CompletableFuture<Void> newFuture = notificationFuture - .thenComposeAsync(v -> inBusyLockAsync(() -> { - List<Entry> filteredUpdatedEntries = updatedEntries.isEmpty() ? emptyList() : updatedEntries.stream() - .filter(WatchProcessor::isNotIdempotentCacheCommand) - .collect(toList()); + return newNotificationFuture; + } + } + } - List<WatchAndEvents> watchAndEvents = collectWatchesAndEvents(filteredUpdatedEntries, newRevision); + private CompletableFuture<Void> notifyWatchesInternal(long newRevision, List<Entry> updatedEntries, HybridTimestamp time) { + assert time != null; - long startTimeNanos = System.nanoTime(); + return enqueue(() -> { + List<Entry> filteredUpdatedEntries = updatedEntries.isEmpty() ? emptyList() : updatedEntries.stream() + .filter(WatchProcessor::isNotIdempotentCacheCommand) + .collect(toList()); - CompletableFuture<Void> notifyWatchesFuture = performWatchesNotifications(watchAndEvents, newRevision, time); + List<WatchAndEvents> watchAndEvents = collectWatchesAndEvents(filteredUpdatedEntries, newRevision); - // Revision update is triggered strictly after all watch listeners have been notified. - CompletableFuture<Void> notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision); + long startTimeNanos = System.nanoTime(); - CompletableFuture<Void> notificationFuture = allOf(notifyWatchesFuture, notifyUpdateRevisionFuture) - .thenRunAsync(() -> inBusyLock(() -> invokeOnRevisionCallback(newRevision, time)), watchExecutor); + CompletableFuture<Void> notifyWatchesFuture = performWatchesNotifications(watchAndEvents, newRevision, time); - notificationFuture.whenComplete((unused, e) -> maybeLogLongProcessing(filteredUpdatedEntries, startTimeNanos)); + // Revision update is triggered strictly after all watch listeners have been notified. + CompletableFuture<Void> notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision); - return notificationFuture; - }), watchExecutor) - .whenComplete((unused, e) -> { - if (e != null) { - notifyFailureHandlerOnFirstFailureInNotificationChain(e); - } - }); + CompletableFuture<Void> notificationFuture = allOf(notifyWatchesFuture, notifyUpdateRevisionFuture) Review Comment: Yes, why not -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org