sashapolo commented on code in PR #4602: URL: https://github.com/apache/ignite-3/pull/4602#discussion_r1808340736
########## modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java: ########## @@ -68,7 +69,7 @@ @ExtendWith(ConfigurationExtension.class) abstract class ItMetaStorageMultipleNodesVsStorageTest extends ItMetaStorageMultipleNodesAbstractTest { @Override - abstract KeyValueStorage createStorage(String nodeName, Path path); + abstract KeyValueStorage createStorage(String nodeName, Path path, ReadOperationForCompactionTracker readOperationForCompactionTracker); Review Comment: Why are we overriding a method with another abstract method? ########## modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java: ########## @@ -49,7 +49,12 @@ public class RocksDbKeyValueStorageTest extends BasicOperationsKeyValueStorageTest { @Override public KeyValueStorage createStorage() { - return new RocksDbKeyValueStorage(NODE_NAME, workDir.resolve("storage"), new NoOpFailureManager()); + return new RocksDbKeyValueStorage( + NODE_NAME, + workDir.resolve("storage"), + new NoOpFailureManager(), + new ReadOperationForCompactionTracker() Review Comment: Shall we introduce a no-op implementation that could be used for tests? We could also use a singleton for it. I'm not talking about this particular test, but most of the tests modified in this PR don't care about compaction. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java: ########## @@ -381,4 +381,17 @@ public CompletableFuture<Void> notifyUpdateRevisionListeners(long newRevision) { return futures.isEmpty() ? nullCompletedFuture() : allOf(futures.toArray(CompletableFuture[]::new)); } + + /** + * Returns a future that will complete when the task in the WatchEvent queue is complete. + * + * <p>This method is not thread-safe and must be performed under an exclusive lock in concurrent scenarios.</p> Review Comment: Same about the closing tag ########## modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java: ########## @@ -423,4 +423,11 @@ public interface MetaStorageManager extends IgniteComponent { /** Unregisters a Meta Storage revision update listener. */ void unregisterRevisionUpdateListener(RevisionUpdateListener listener); + + /** + * Returns the local compaction revision that was set or restored from a metastorage snapshot, {@code -1} if not changed. Review Comment: > {@code -1} if not changed What does this phrase mean? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -196,6 +223,41 @@ public long getCompactionRevision() { } } + @Override + public void startCompaction(long revision) { + assert revision >= 0 : revision; + + rwLock.writeLock().lock(); + + try { + assertCompactionRevisionLessThanCurrent(revision, rev); + + if (isRecoveryState()) { + compactionRevision = revision; + } else { + watchProcessor + .addTaskToWatchEventQueue(() -> setCompactionRevision(revision)) + .thenComposeAsync(unused -> readOperationsFuture(revision), compactionExecutor) + .thenRunAsync(() -> compact(revision), compactionExecutor) + .whenComplete((unused, throwable) -> { + if (throwable == null) { + log.info("Metastore compaction completed successfully: [compactionRevision={}]", revision); + } else { + log.error( + "Metastore compaction completed unsuccessfully: [compactionRevision={}]", Review Comment: ```suggestion "Metastore compaction failed: [compactionRevision={}]", ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -196,6 +223,41 @@ public long getCompactionRevision() { } } + @Override + public void startCompaction(long revision) { Review Comment: can we rename the parameter to `targetRevision` or `compactionRevision`? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -104,6 +124,13 @@ protected AbstractKeyValueStorage(String nodeName, FailureManager failureManager /** Returns key values by revision for operation. */ protected abstract Value valueForOperation(byte[] key, long revision); + /** + * Returns {@code true} if the storage is in recovery state and the watches have not {@link #startWatches started}. Review Comment: This javadoc is confusing. For the Rocksdb storage, watches can be started, while we are still "in recovery". So I guess the javadoc should be: ``` Returns {@code true} if the storage is in the recovery state. ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java: ########## @@ -36,17 +36,23 @@ * <p>Expected usage:</p> * <ul> * <li>Before starting execution, the reading command invoke {@link #track} with its ID and the compaction revision that is currently - * set ({@link MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link KeyValueStorage#setCompactionRevision}).</li> + * set ({@link KeyValueStorage#setCompactionRevision}).</li> * <li>After completion, the reading command will invoke {@link #untrack} with the same arguments as when calling {@link #track}, * regardless of whether the operation was successful or not.</li> * <li>{@link #collect} will be invoked only after a new compaction revision has been set - * ({@link MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link KeyValueStorage#setCompactionRevision}) for a new - * compaction revision.</li> + * ({@link KeyValueStorage#setCompactionRevision}) for a new compaction revision.</li> * </ul> */ public class ReadOperationForCompactionTracker { private final Map<ReadOperationKey, CompletableFuture<Void>> readOperationFutureByKey = new ConcurrentHashMap<>(); + private final AtomicLong longOperationIdGenerator = new AtomicLong(); + + /** Generates the next read operation ID. Thread-safe. */ + public long generateLongReadOperationId() { Review Comment: what do you mean by "Long read operation ID"? Is it "long" because of the return type or because the read operation takes a long time? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java: ########## @@ -335,7 +335,7 @@ private void invokeOnRevisionCallback(long revision, HybridTimestamp time) { /** * Advances safe time without notifying watches (as there is no new revision). * - * <p>This method is not thread-safe and must be performed under an exclusive lock in concurrent scenarios. + * <p>This method is not thread-safe and must be performed under an exclusive lock in concurrent scenarios.</p> Review Comment: What is this change for? There's no reason for using closing `</p>` tags, most javadocs (most notably, Java's standard library javadocs) don't use them ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java: ########## @@ -69,6 +75,8 @@ public class ReadOperationForCompactionTracker { * @see #untrack(Object, long) */ public void track(Object readOperationId, long compactionRevision) { + System.err.println(">>>"); Review Comment: You probably forgot to remote this ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -79,22 +91,30 @@ public abstract class AbstractKeyValueStorage implements KeyValueStorage { protected final AtomicBoolean stopCompaction = new AtomicBoolean(); /** Tracks only cursors, since reading a single entry or a batch is done entirely under {@link #rwLock}. */ - protected final ReadOperationForCompactionTracker readOperationForCompactionTracker = new ReadOperationForCompactionTracker(); + protected final ReadOperationForCompactionTracker readOperationForCompactionTracker; - /** - * Used to generate read operation ID for {@link #readOperationForCompactionTracker}. - * - * <p>Multi-threaded access is guarded by {@link #rwLock}.</p> - */ - protected long readOperationIdGeneratorForTracker; + protected final ExecutorService compactionExecutor; + + private final List<CompactionListener> compactionListeners = new CopyOnWriteArrayList<>(); Review Comment: We already have the EventProducer interface, can it be used instead? ########## modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java: ########## @@ -423,4 +423,11 @@ public interface MetaStorageManager extends IgniteComponent { /** Unregisters a Meta Storage revision update listener. */ void unregisterRevisionUpdateListener(RevisionUpdateListener listener); + + /** + * Returns the local compaction revision that was set or restored from a metastorage snapshot, {@code -1} if not changed. + * + * @throws IgniteInternalException with cause {@link NodeStoppingException} if the node is in the process of stopping. Review Comment: why aren't we throing `NodeStoppingException` directly? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -104,6 +124,13 @@ protected AbstractKeyValueStorage(String nodeName, FailureManager failureManager /** Returns key values by revision for operation. */ protected abstract Value valueForOperation(byte[] key, long revision); + /** + * Returns {@code true} if the storage is in recovery state and the watches have not {@link #startWatches started}. + * + * <p>Method is expected to be invoked under {@link #rwLock}.</p> + */ + protected abstract boolean isRecoveryState(); Review Comment: I think it would be better to call this method `inRecoveryState` or `isInRecoveryState` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -196,6 +223,41 @@ public long getCompactionRevision() { } } + @Override + public void startCompaction(long revision) { + assert revision >= 0 : revision; + + rwLock.writeLock().lock(); + + try { + assertCompactionRevisionLessThanCurrent(revision, rev); + + if (isRecoveryState()) { + compactionRevision = revision; + } else { + watchProcessor + .addTaskToWatchEventQueue(() -> setCompactionRevision(revision)) + .thenComposeAsync(unused -> readOperationsFuture(revision), compactionExecutor) + .thenRunAsync(() -> compact(revision), compactionExecutor) + .whenComplete((unused, throwable) -> { Review Comment: I would use `whenCompleteAsync` here, just in case ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -79,22 +91,30 @@ public abstract class AbstractKeyValueStorage implements KeyValueStorage { protected final AtomicBoolean stopCompaction = new AtomicBoolean(); /** Tracks only cursors, since reading a single entry or a batch is done entirely under {@link #rwLock}. */ - protected final ReadOperationForCompactionTracker readOperationForCompactionTracker = new ReadOperationForCompactionTracker(); + protected final ReadOperationForCompactionTracker readOperationForCompactionTracker; - /** - * Used to generate read operation ID for {@link #readOperationForCompactionTracker}. - * - * <p>Multi-threaded access is guarded by {@link #rwLock}.</p> - */ - protected long readOperationIdGeneratorForTracker; + protected final ExecutorService compactionExecutor; + + private final List<CompactionListener> compactionListeners = new CopyOnWriteArrayList<>(); /** * Constructor. * * @param nodeName Node name. * @param failureManager Failure processor that is used to handle critical errors. + * @param readOperationForCompactionTracker Read operation tracker for metastorage compaction. + * @param compactionExecutor Metastorage compaction executor. */ - protected AbstractKeyValueStorage(String nodeName, FailureManager failureManager) { + protected AbstractKeyValueStorage( + String nodeName, + FailureManager failureManager, + ReadOperationForCompactionTracker readOperationForCompactionTracker, + ExecutorService compactionExecutor + ) { + this.failureManager = failureManager; + this.readOperationForCompactionTracker = readOperationForCompactionTracker; + this.compactionExecutor = compactionExecutor; Review Comment: Where do you stop this executor? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -196,6 +223,41 @@ public long getCompactionRevision() { } } + @Override + public void startCompaction(long revision) { + assert revision >= 0 : revision; + + rwLock.writeLock().lock(); + + try { + assertCompactionRevisionLessThanCurrent(revision, rev); + + if (isRecoveryState()) { + compactionRevision = revision; + } else { + watchProcessor + .addTaskToWatchEventQueue(() -> setCompactionRevision(revision)) + .thenComposeAsync(unused -> readOperationsFuture(revision), compactionExecutor) + .thenRunAsync(() -> compact(revision), compactionExecutor) + .whenComplete((unused, throwable) -> { + if (throwable == null) { + log.info("Metastore compaction completed successfully: [compactionRevision={}]", revision); + } else { + log.error( + "Metastore compaction completed unsuccessfully: [compactionRevision={}]", Review Comment: `completed unsuccessfully` reads really strange and can be confused with the successful outcome ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactionListener.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server; + +/** Listener of the metastorage compaction. */ +@FunctionalInterface +public interface CompactionListener { + /** Callback on completion of metastore compaction locally. */ + void onCompleteLocally(long compactionRevision); Review Comment: I think that `onCompactionComplete` is a better name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org