ibessonov commented on code in PR #2261: URL: https://github.com/apache/ignite-3/pull/2261#discussion_r1244882963
########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java: ########## @@ -128,75 +131,94 @@ public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) { // Revision must be the same for all entries. long newRevision = updatedEntries.get(0).revision(); - // Notify all watches in parallel, then aggregate the entries that they have processed. - CompletableFuture<List<EntryEvent>>[] notificationFutures = watches.stream() - .map(watch -> notifyWatch(watch, updatedEntries, newRevision, time)) - .toArray(CompletableFuture[]::new); + // Collect all the events for each watch. + CompletableFuture<List<WatchAndEvents>> watchAndEventsFuture = collectWatchAndEvents(updatedEntries, newRevision); - return allOf(notificationFutures) - .thenComposeAsync(ignored -> invokeOnRevisionCallback(notificationFutures, newRevision, time), watchExecutor); + return watchAndEventsFuture + .thenComposeAsync(watchAndEvents -> allOf( + notifyWatches(watchAndEvents, newRevision, time), + notifyUpdateRevisionListeners(newRevision) + ), watchExecutor) + .thenComposeAsync(ignored -> invokeOnRevisionCallback(watchAndEventsFuture, newRevision, time), watchExecutor); }, watchExecutor); } - private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch, List<Entry> updatedEntries, long revision, HybridTimestamp time) { - CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> { - List<EntryEvent> entryEvents = List.of(); - for (Entry newEntry : updatedEntries) { - byte[] newKey = newEntry.key(); + private static CompletableFuture<Void> notifyWatches(List<WatchAndEvents> watchAndEventsList, long revision, HybridTimestamp time) { + if (watchAndEventsList.isEmpty()) { + return completedFuture(null); + } - assert newEntry.revision() == revision; + CompletableFuture<?>[] notifyWatchFutures = new CompletableFuture[watchAndEventsList.size()]; - if (watch.matches(newKey, revision)) { - Entry oldEntry = entryReader.get(newKey, revision - 1); + int i = 0; - if (entryEvents.isEmpty()) { - entryEvents = new ArrayList<>(); - } + for (WatchAndEvents watchAndEvents : watchAndEventsList) { + CompletableFuture<Void> notifyWatchFuture; - entryEvents.add(new EntryEvent(oldEntry, newEntry)); - } + try { + notifyWatchFuture = watchAndEvents.events.isEmpty() + ? watchAndEvents.watch.onRevisionUpdated(revision) + : watchAndEvents.watch.onUpdate(new WatchEvent(watchAndEvents.events, revision, time)); Review Comment: Yes, this is the contract -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org