rpuch commented on code in PR #4663:
URL: https://github.com/apache/ignite-3/pull/4663#discussion_r1827531985


##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage;
+
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about metastorage revisions. */
+public class Revisions {
+    private final long revision;
+
+    private final long compactionRevision;
+
+    /**
+     * Constructor.
+     *
+     * @param revision Metastorage revision.
+     * @param compactionRevision Metastorage compaction revision.
+     */
+    public Revisions(long revision, long compactionRevision) {
+        this.revision = revision;
+        this.compactionRevision = compactionRevision;
+    }
+
+    /** Returns metastorage revision. */
+    public long revision() {
+        return revision;
+    }
+
+    /** Returns metastorage compaction revision. */

Review Comment:
   Please explain what it means: this revision is the largest revision which 
keys could be removed due to compaction, or it's least revision which is NOT 
yet affected by compaction?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -237,11 +238,11 @@ public CompletableFuture<Boolean> 
saveSnapshot(SnapshotEntry update) {
     }
 
     private void recoveryStateFromMetastore(OnUpdateHandler handler) {

Review Comment:
   ```suggestion
       private void recoverStateFromMetastore(OnUpdateHandler handler) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -311,75 +316,37 @@ public void addElectionListener(ElectionListener 
listener) {
         electionListeners.add(listener);
     }
 
-    private CompletableFuture<Long> recover(MetaStorageService service) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
+    private CompletableFuture<?> 
updateTargetRevisionsForRecovery(MetaStorageService service) {
+        return inBusyLockAsync(busyLock, () -> {
+            service.currentRevisions()
+                    .thenAccept(targetRevisions -> {
+                        assert targetRevisions != null;
 
-        try {
-            service.currentRevision().whenComplete((targetRevision, throwable) 
-> {
-                if (throwable != null) {
-                    recoveryFinishedFuture.completeExceptionally(throwable);
+                        LOG.info("Performing MetaStorage recovery: [from={}, 
to={}]", storage.revisions(), targetRevisions);
 
-                    return;
-                }
+                        
recoveryRevisionsListener.setTargetRevisions(targetRevisions.toRevisions());
+                    }).whenComplete((res, throwable) -> {
+                        if (throwable != null) {
+                            
recoveryFinishedFuture.completeExceptionally(throwable);
+                        }
+                    });
 
-                LOG.info("Performing MetaStorage recovery from revision {} to 
{}", storage.revision(), targetRevision);
+            return recoveryFinishedFuture.whenComplete((revisions, throwable) 
-> {
+                if (throwable != null) {
+                    LOG.info("Recovery failed", throwable);
+                } else {
+                    long recoveryRevision = revisions.revision();
 
-                assert targetRevision != null;
+                    appliedRevision = recoveryRevision;
 
-                listenForRecovery(targetRevision);
-            }).whenComplete((res, ex) -> {
-                if (ex != null) {
-                    LOG.info("Recovery failed", ex);
+                    if (recoveryRevision > 0) {
+                        
clusterTime.updateSafeTime(storage.timestampByRevision(recoveryRevision));
+                    }
 
-                    recoveryFinishedFuture.completeExceptionally(ex);
+                    LOG.info("Finished MetaStorage recovery");

Review Comment:
   Please add a try/catch around this block; otherwise, if something is thrown 
from it, we'll never know it, just the future will never be completed, and it 
will be difficult to see what happened



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -311,75 +316,37 @@ public void addElectionListener(ElectionListener 
listener) {
         electionListeners.add(listener);
     }
 
-    private CompletableFuture<Long> recover(MetaStorageService service) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
+    private CompletableFuture<?> 
updateTargetRevisionsForRecovery(MetaStorageService service) {

Review Comment:
   Why was the method renamed?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.RecoveryRevisionsListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Implementation for {@link MetaStorageManagerImpl}. */

Review Comment:
   What does it mean: implementation for `MetaStorageManagerImpl`? I would 
understand an 'implementation of <interface>', but it would still be redundant 
as the code says this thing by itself...



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -430,4 +455,50 @@ private List<Entry> doGetAll(List<byte[]> keys, long 
revUpperBound) {
 
         return res;
     }
+
+    @Override
+    public void advanceSafeTime(KeyValueUpdateContext context) {
+        rwLock.writeLock().lock();
+
+        try {
+            setIndexAndTerm(context.index, context.term);
+
+            if (isWatchesStarted()) {
+                watchProcessor.advanceSafeTime(context.timestamp);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public Revisions revisions() {
+        rwLock.readLock().lock();
+
+        try {
+            return createCurrentRevisions();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private Revisions createCurrentRevisions() {
+        return new Revisions(rev, compactionRevision);
+    }
+
+    protected void 
addToNotifyWatchProcessorEventsBeforeStartWatches(NotifyWatchProcessorEvent 
event) {

Review Comment:
   ```suggestion
       protected void 
addToNotifyWatchProcessorEventsBeforeStartingWatches(NotifyWatchProcessorEvent 
event) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -115,11 +125,20 @@ protected AbstractKeyValueStorage(
     protected abstract Value valueForOperation(byte[] key, long revision);
 
     /**
-     * Returns {@code true} if the storage is in the recovery state.
+     * Returns {@code true} if the metastorage is in the recovery state.
+     *
+     * <p>Method is expected to be invoked under {@link #rwLock}.</p>
+     */
+    private boolean isInRecoveryState() {
+        return recoveryRevisionListener != null;
+    }
+
+    /**
+     * Returns {@code true} if the watches have {@link #startWatches started}.
      *
      * <p>Method is expected to be invoked under {@link #rwLock}.</p>
      */
-    protected abstract boolean isInRecoveryState();
+    protected abstract boolean isWatchesStarted();

Review Comment:
   ```suggestion
       protected abstract boolean areWatchesStarted();
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1090,21 +1097,16 @@ private void queueWatchEvent() {
                 updatedEntries.clear();
 
                 break;
-
             case IN_PROGRESS:
-                // Buffer the event while event replay is still in progress.
-                if (eventCache == null) {
-                    eventCache = new ArrayList<>();
-                }
+                UpdatedEntries copy = updatedEntries.transfer();
 
-                eventCache.add(updatedEntries.transfer());
+                var event = new UpdateEntriesEvent(copy.updatedEntries, 
copy.ts);
 
-                break;
+                addToNotifyWatchProcessorEventsBeforeStartWatches(event);
 
+                break;
             default:
                 notifyWatches();
-
-                break;

Review Comment:
   It seems consistent to have `break` even in the `default` branch. I would 
leave it here



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.RecoveryRevisionsListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Implementation for {@link MetaStorageManagerImpl}. */
+class RecoveryRevisionsListenerImpl implements RecoveryRevisionsListener {
+    private final IgniteSpinBusyLock busyLock;
+
+    private final CompletableFuture<Revisions> recoveryFinishFuture;
+
+    private final KeyValueStorage storage;
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /** Guarded by {@link #lock}. */
+    private Revisions targetRevisions;
+
+    /** Guarded by {@link #lock}. */
+    private Revisions currentRevisions;
+
+    RecoveryRevisionsListenerImpl(
+            IgniteSpinBusyLock busyLock,
+            CompletableFuture<Revisions> recoveryFinishFuture,
+            KeyValueStorage storage
+    ) {
+        this.busyLock = busyLock;
+        this.recoveryFinishFuture = recoveryFinishFuture;
+        this.storage = storage;
+    }
+
+    @Override
+    public void onUpdate(Revisions currentRevisions) {
+        lock.lock();
+
+        try {
+            this.currentRevisions = currentRevisions;
+
+            completeRecoveryFinishFutureIfPossible();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void setTargetRevisions(Revisions targetRevisions) {
+        lock.lock();
+
+        try {
+            this.targetRevisions = targetRevisions;
+
+            completeRecoveryFinishFutureIfPossible();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void completeRecoveryFinishFutureIfPossible() {
+        if (!busyLock.enterBusy()) {
+            recoveryFinishFuture.completeExceptionally(new 
NodeStoppingException());
+        }
+
+        try {
+            if (targetRevisions == null
+                    || currentRevisions == null
+                    || currentRevisions.revision() < targetRevisions.revision()
+                    || currentRevisions.compactionRevision() < 
targetRevisions.compactionRevision()) {

Review Comment:
   Is there a guarantee that no node can be ahead of the leader by 
compactionRevision? If yes, how is it established?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -311,75 +316,37 @@ public void addElectionListener(ElectionListener 
listener) {
         electionListeners.add(listener);
     }
 
-    private CompletableFuture<Long> recover(MetaStorageService service) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
+    private CompletableFuture<?> 
updateTargetRevisionsForRecovery(MetaStorageService service) {
+        return inBusyLockAsync(busyLock, () -> {
+            service.currentRevisions()
+                    .thenAccept(targetRevisions -> {
+                        assert targetRevisions != null;
 
-        try {
-            service.currentRevision().whenComplete((targetRevision, throwable) 
-> {
-                if (throwable != null) {
-                    recoveryFinishedFuture.completeExceptionally(throwable);
+                        LOG.info("Performing MetaStorage recovery: [from={}, 
to={}]", storage.revisions(), targetRevisions);

Review Comment:
   Do we need information about compaction revisions in the log?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -115,11 +125,20 @@ protected AbstractKeyValueStorage(
     protected abstract Value valueForOperation(byte[] key, long revision);
 
     /**
-     * Returns {@code true} if the storage is in the recovery state.
+     * Returns {@code true} if the metastorage is in the recovery state.
+     *
+     * <p>Method is expected to be invoked under {@link #rwLock}.</p>

Review Comment:
   Should an assertion be added?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -430,4 +455,50 @@ private List<Entry> doGetAll(List<byte[]> keys, long 
revUpperBound) {
 
         return res;
     }
+
+    @Override
+    public void advanceSafeTime(KeyValueUpdateContext context) {
+        rwLock.writeLock().lock();
+
+        try {
+            setIndexAndTerm(context.index, context.term);
+
+            if (isWatchesStarted()) {
+                watchProcessor.advanceSafeTime(context.timestamp);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public Revisions revisions() {
+        rwLock.readLock().lock();
+
+        try {
+            return createCurrentRevisions();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private Revisions createCurrentRevisions() {
+        return new Revisions(rev, compactionRevision);
+    }
+
+    protected void 
addToNotifyWatchProcessorEventsBeforeStartWatches(NotifyWatchProcessorEvent 
event) {
+        assert !isWatchesStarted();
+
+        boolean added = 
notifyWatchProcessorEventsBeforeStartWatches.add(event);
+
+        assert added : event;
+    }
+
+    protected void drainNotifyWatchProcessorEventsBeforeStartWatches() {

Review Comment:
   ```suggestion
       protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -88,6 +89,15 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
     protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
+    /**
+     * Events for notification of the {@link WatchProcessor} that were created 
before the {@link #startWatches start of watches}, after the
+     * start of watches there will be {@code null}. Events are sorted by 
{@link NotifyWatchProcessorEvent#timestamp} and are expected to
+     * have no duplicates.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    protected @Nullable TreeSet<NotifyWatchProcessorEvent> 
notifyWatchProcessorEventsBeforeStartWatches = new TreeSet<>();

Review Comment:
   ```suggestion
       protected @Nullable TreeSet<NotifyWatchProcessorEvent> 
notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to