ibessonov commented on code in PR #2261:
URL: https://github.com/apache/ignite-3/pull/2261#discussion_r1244882963


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -128,75 +131,94 @@ public void notifyWatches(List<Entry> updatedEntries, 
HybridTimestamp time) {
                     // Revision must be the same for all entries.
                     long newRevision = updatedEntries.get(0).revision();
 
-                    // Notify all watches in parallel, then aggregate the 
entries that they have processed.
-                    CompletableFuture<List<EntryEvent>>[] notificationFutures 
= watches.stream()
-                            .map(watch -> notifyWatch(watch, updatedEntries, 
newRevision, time))
-                            .toArray(CompletableFuture[]::new);
+                    // Collect all the events for each watch.
+                    CompletableFuture<List<WatchAndEvents>> 
watchAndEventsFuture = collectWatchAndEvents(updatedEntries, newRevision);
 
-                    return allOf(notificationFutures)
-                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision, time), 
watchExecutor);
+                    return watchAndEventsFuture
+                            .thenComposeAsync(watchAndEvents -> allOf(
+                                    notifyWatches(watchAndEvents, newRevision, 
time),
+                                    notifyUpdateRevisionListeners(newRevision)
+                            ), watchExecutor)
+                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(watchAndEventsFuture, newRevision, time), 
watchExecutor);
                 }, watchExecutor);
     }
 
-    private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch, 
List<Entry> updatedEntries, long revision, HybridTimestamp time) {
-        CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> {
-            List<EntryEvent> entryEvents = List.of();
 
-            for (Entry newEntry : updatedEntries) {
-                byte[] newKey = newEntry.key();
+    private static CompletableFuture<Void> notifyWatches(List<WatchAndEvents> 
watchAndEventsList, long revision, HybridTimestamp time) {
+        if (watchAndEventsList.isEmpty()) {
+            return completedFuture(null);
+        }
 
-                assert newEntry.revision() == revision;
+        CompletableFuture<?>[] notifyWatchFutures = new 
CompletableFuture[watchAndEventsList.size()];
 
-                if (watch.matches(newKey, revision)) {
-                    Entry oldEntry = entryReader.get(newKey, revision - 1);
+        int i = 0;
 
-                    if (entryEvents.isEmpty()) {
-                        entryEvents = new ArrayList<>();
-                    }
+        for (WatchAndEvents watchAndEvents : watchAndEventsList) {
+            CompletableFuture<Void> notifyWatchFuture;
 
-                    entryEvents.add(new EntryEvent(oldEntry, newEntry));
-                }
+            try {
+                notifyWatchFuture = watchAndEvents.events.isEmpty()
+                        ? watchAndEvents.watch.onRevisionUpdated(revision)
+                        : watchAndEvents.watch.onUpdate(new 
WatchEvent(watchAndEvents.events, revision, time));

Review Comment:
   Yes, this is the contract



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to