sashapolo commented on code in PR #4602:
URL: https://github.com/apache/ignite-3/pull/4602#discussion_r1808340736


##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java:
##########
@@ -68,7 +69,7 @@
 @ExtendWith(ConfigurationExtension.class)
 abstract class ItMetaStorageMultipleNodesVsStorageTest extends 
ItMetaStorageMultipleNodesAbstractTest {
     @Override
-    abstract KeyValueStorage createStorage(String nodeName, Path path);
+    abstract KeyValueStorage createStorage(String nodeName, Path path, 
ReadOperationForCompactionTracker readOperationForCompactionTracker);

Review Comment:
   Why are we overriding a method with another abstract method?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java:
##########
@@ -49,7 +49,12 @@
 public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage(NODE_NAME, 
workDir.resolve("storage"), new NoOpFailureManager());
+        return new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir.resolve("storage"),
+                new NoOpFailureManager(),
+                new ReadOperationForCompactionTracker()

Review Comment:
   Shall we introduce a no-op implementation that could be used for tests? We 
could also use a singleton for it. I'm not talking about this particular test, 
but most of the tests modified in this PR don't care about compaction.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -381,4 +381,17 @@ public CompletableFuture<Void> 
notifyUpdateRevisionListeners(long newRevision) {
 
         return futures.isEmpty() ? nullCompletedFuture() : 
allOf(futures.toArray(CompletableFuture[]::new));
     }
+
+    /**
+     * Returns a future that will complete when the task in the WatchEvent 
queue is complete.
+     *
+     * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.</p>

Review Comment:
   Same about the closing tag



##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -423,4 +423,11 @@ public interface MetaStorageManager extends 
IgniteComponent {
 
     /** Unregisters a Meta Storage revision update listener. */
     void unregisterRevisionUpdateListener(RevisionUpdateListener listener);
+
+    /**
+     * Returns the local compaction revision that was set or restored from a 
metastorage snapshot, {@code -1} if not changed.

Review Comment:
   > {@code -1} if not changed
   
   What does this phrase mean?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -196,6 +223,41 @@ public long getCompactionRevision() {
         }
     }
 
+    @Override
+    public void startCompaction(long revision) {
+        assert revision >= 0 : revision;
+
+        rwLock.writeLock().lock();
+
+        try {
+            assertCompactionRevisionLessThanCurrent(revision, rev);
+
+            if (isRecoveryState()) {
+                compactionRevision = revision;
+            } else {
+                watchProcessor
+                        .addTaskToWatchEventQueue(() -> 
setCompactionRevision(revision))
+                        .thenComposeAsync(unused -> 
readOperationsFuture(revision), compactionExecutor)
+                        .thenRunAsync(() -> compact(revision), 
compactionExecutor)
+                        .whenComplete((unused, throwable) -> {
+                            if (throwable == null) {
+                                log.info("Metastore compaction completed 
successfully: [compactionRevision={}]", revision);
+                            } else {
+                                log.error(
+                                        "Metastore compaction completed 
unsuccessfully: [compactionRevision={}]",

Review Comment:
   ```suggestion
                                           "Metastore compaction failed: 
[compactionRevision={}]",
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -196,6 +223,41 @@ public long getCompactionRevision() {
         }
     }
 
+    @Override
+    public void startCompaction(long revision) {

Review Comment:
   can we rename the parameter to `targetRevision` or `compactionRevision`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -104,6 +124,13 @@ protected AbstractKeyValueStorage(String nodeName, 
FailureManager failureManager
     /** Returns key values by revision for operation. */
     protected abstract Value valueForOperation(byte[] key, long revision);
 
+    /**
+     * Returns {@code true} if the storage is in recovery state and the 
watches have not {@link #startWatches started}.

Review Comment:
   This javadoc is confusing. For the Rocksdb storage, watches can be started, 
while we are still "in recovery". So I guess the javadoc should be:
   
   ```
   Returns {@code true} if the storage is in the recovery state.
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java:
##########
@@ -36,17 +36,23 @@
  * <p>Expected usage:</p>
  * <ul>
  *     <li>Before starting execution, the reading command invoke {@link 
#track} with its ID and the compaction revision that is currently
- *     set ({@link 
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link 
KeyValueStorage#setCompactionRevision}).</li>
+ *     set ({@link KeyValueStorage#setCompactionRevision}).</li>
  *     <li>After completion, the reading command will invoke {@link #untrack} 
with the same arguments as when calling {@link #track},
  *     regardless of whether the operation was successful or not.</li>
  *     <li>{@link #collect} will be invoked only after a new compaction 
revision has been set
- *     ({@link 
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link 
KeyValueStorage#setCompactionRevision}) for a new
- *     compaction revision.</li>
+ *     ({@link KeyValueStorage#setCompactionRevision}) for a new compaction 
revision.</li>
  * </ul>
  */
 public class ReadOperationForCompactionTracker {
     private final Map<ReadOperationKey, CompletableFuture<Void>> 
readOperationFutureByKey = new ConcurrentHashMap<>();
 
+    private final AtomicLong longOperationIdGenerator = new AtomicLong();
+
+    /** Generates the next read operation ID. Thread-safe. */
+    public long generateLongReadOperationId() {

Review Comment:
   what do you mean by "Long read operation ID"? Is it "long" because of the 
return type or because the read operation takes a long time?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -335,7 +335,7 @@ private void invokeOnRevisionCallback(long revision, 
HybridTimestamp time) {
     /**
      * Advances safe time without notifying watches (as there is no new 
revision).
      *
-     * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.
+     * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.</p>

Review Comment:
   What is this change for? There's no reason for using closing `</p>` tags, 
most javadocs (most notably, Java's standard library javadocs) don't use them



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java:
##########
@@ -69,6 +75,8 @@ public class ReadOperationForCompactionTracker {
      * @see #untrack(Object, long)
      */
     public void track(Object readOperationId, long compactionRevision) {
+        System.err.println(">>>");

Review Comment:
   You probably forgot to remote this



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -79,22 +91,30 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     protected final AtomicBoolean stopCompaction = new AtomicBoolean();
 
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
-    protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
+    protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
-    /**
-     * Used to generate read operation ID for {@link 
#readOperationForCompactionTracker}.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     */
-    protected long readOperationIdGeneratorForTracker;
+    protected final ExecutorService compactionExecutor;
+
+    private final List<CompactionListener> compactionListeners = new 
CopyOnWriteArrayList<>();

Review Comment:
   We already have the EventProducer interface, can it be used instead?



##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -423,4 +423,11 @@ public interface MetaStorageManager extends 
IgniteComponent {
 
     /** Unregisters a Meta Storage revision update listener. */
     void unregisterRevisionUpdateListener(RevisionUpdateListener listener);
+
+    /**
+     * Returns the local compaction revision that was set or restored from a 
metastorage snapshot, {@code -1} if not changed.
+     *
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.

Review Comment:
   why aren't we throing `NodeStoppingException` directly?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -104,6 +124,13 @@ protected AbstractKeyValueStorage(String nodeName, 
FailureManager failureManager
     /** Returns key values by revision for operation. */
     protected abstract Value valueForOperation(byte[] key, long revision);
 
+    /**
+     * Returns {@code true} if the storage is in recovery state and the 
watches have not {@link #startWatches started}.
+     *
+     * <p>Method is expected to be invoked under {@link #rwLock}.</p>
+     */
+    protected abstract boolean isRecoveryState();

Review Comment:
   I think it would be better to call this method `inRecoveryState` or 
`isInRecoveryState`



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -196,6 +223,41 @@ public long getCompactionRevision() {
         }
     }
 
+    @Override
+    public void startCompaction(long revision) {
+        assert revision >= 0 : revision;
+
+        rwLock.writeLock().lock();
+
+        try {
+            assertCompactionRevisionLessThanCurrent(revision, rev);
+
+            if (isRecoveryState()) {
+                compactionRevision = revision;
+            } else {
+                watchProcessor
+                        .addTaskToWatchEventQueue(() -> 
setCompactionRevision(revision))
+                        .thenComposeAsync(unused -> 
readOperationsFuture(revision), compactionExecutor)
+                        .thenRunAsync(() -> compact(revision), 
compactionExecutor)
+                        .whenComplete((unused, throwable) -> {

Review Comment:
   I would use `whenCompleteAsync` here, just in case



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -79,22 +91,30 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     protected final AtomicBoolean stopCompaction = new AtomicBoolean();
 
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
-    protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
+    protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
-    /**
-     * Used to generate read operation ID for {@link 
#readOperationForCompactionTracker}.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     */
-    protected long readOperationIdGeneratorForTracker;
+    protected final ExecutorService compactionExecutor;
+
+    private final List<CompactionListener> compactionListeners = new 
CopyOnWriteArrayList<>();
 
     /**
      * Constructor.
      *
      * @param nodeName Node name.
      * @param failureManager Failure processor that is used to handle critical 
errors.
+     * @param readOperationForCompactionTracker Read operation tracker for 
metastorage compaction.
+     * @param compactionExecutor Metastorage compaction executor.
      */
-    protected AbstractKeyValueStorage(String nodeName, FailureManager 
failureManager) {
+    protected AbstractKeyValueStorage(
+            String nodeName,
+            FailureManager failureManager,
+            ReadOperationForCompactionTracker 
readOperationForCompactionTracker,
+            ExecutorService compactionExecutor
+    ) {
+        this.failureManager = failureManager;
+        this.readOperationForCompactionTracker = 
readOperationForCompactionTracker;
+        this.compactionExecutor = compactionExecutor;

Review Comment:
   Where do you stop this executor?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java:
##########
@@ -196,6 +223,41 @@ public long getCompactionRevision() {
         }
     }
 
+    @Override
+    public void startCompaction(long revision) {
+        assert revision >= 0 : revision;
+
+        rwLock.writeLock().lock();
+
+        try {
+            assertCompactionRevisionLessThanCurrent(revision, rev);
+
+            if (isRecoveryState()) {
+                compactionRevision = revision;
+            } else {
+                watchProcessor
+                        .addTaskToWatchEventQueue(() -> 
setCompactionRevision(revision))
+                        .thenComposeAsync(unused -> 
readOperationsFuture(revision), compactionExecutor)
+                        .thenRunAsync(() -> compact(revision), 
compactionExecutor)
+                        .whenComplete((unused, throwable) -> {
+                            if (throwable == null) {
+                                log.info("Metastore compaction completed 
successfully: [compactionRevision={}]", revision);
+                            } else {
+                                log.error(
+                                        "Metastore compaction completed 
unsuccessfully: [compactionRevision={}]",

Review Comment:
   `completed unsuccessfully` reads really strange and can be confused with the 
successful outcome



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactionListener.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/** Listener of the metastorage compaction. */
+@FunctionalInterface
+public interface CompactionListener {
+    /** Callback on completion of metastore compaction locally. */
+    void onCompleteLocally(long compactionRevision);

Review Comment:
   I think that `onCompactionComplete` is a better name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to