rpuch commented on code in PR #4663:
URL: https://github.com/apache/ignite-3/pull/4663#discussion_r1824475478


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java:
##########
@@ -319,4 +322,33 @@ private void cancelLastScheduledFutureBusy() {
             lock.unlock();
         }
     }
+
+    private void startCompactionOnRecoveryAsync() {
+        assert metaStorageManager.recoveryFinishedFuture().isDone();
+
+        long latestCompactionRevisionLocally = 
metaStorageManager.getCompactionRevisionLocally();
+
+        if (latestCompactionRevisionLocally != -1) {
+            runAsync(() -> inBusyLockSafe(busyLock, () -> 
storage.compact(latestCompactionRevisionLocally)), compactionExecutor)

Review Comment:
   Is there a guarantee this compaction will be finished before subsequent 
compactions will be started? If it's because `compactionExecutor` is 
single-threaded, could you please write a comment about this?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java:
##########
@@ -55,4 +76,50 @@ public static boolean equals(Entry act, Entry exp) {
 
         return Arrays.equals(act.value(), exp.value());
     }
+
+    /** Creates a cluster configuration with metastorage compaction 
properties. */
+    public static String createClusterConfigWithCompactionProperties(String 
interval, String dataAvailabilityTime) {

Review Comment:
   Why are parameters of String type and not just ints/longs?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java:
##########
@@ -55,4 +76,50 @@ public static boolean equals(Entry act, Entry exp) {
 
         return Arrays.equals(act.value(), exp.value());
     }
+
+    /** Creates a cluster configuration with metastorage compaction 
properties. */
+    public static String createClusterConfigWithCompactionProperties(String 
interval, String dataAvailabilityTime) {
+        return String.format(
+                "ignite.system.properties: {"
+                        + "%s.propertyValue= \"%s\", "
+                        + "%s.propertyValue= \"%s\""
+                        + "}",
+                INTERVAL_SYSTEM_PROPERTY_NAME, interval, 
DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME, dataAvailabilityTime
+        );
+    }
+
+    /** Returns the latest revision for the key from the leader. */
+    public static long latestKeyRevision(MetaStorageManager 
metaStorageManager, ByteArray key) {
+        CompletableFuture<Entry> latestEntryFuture = 
metaStorageManager.get(key);
+        assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
+
+        return latestEntryFuture.join().revision();
+    }
+
+    /** Returns {@code true} if the metastorage key has only one revision in 
the cluster. */
+    public static boolean allNodesContainsSingleRevisionForKeyLocally(Cluster 
cluster, ByteArray key, long revision) {

Review Comment:
   ```suggestion
       public static boolean allNodesContainSingleRevisionForKeyLocally(Cluster 
cluster, ByteArray key, long revision) {
   ```



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.BAR_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainsSingleRevisionForKeyLocally;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
+import 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link MetaStorageCompactionTrigger} testing for single node case. */
+@WithSystemProperty(key = 
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value = 
"true")
+public class ItMetaStorageCompactionTriggerOneNodeTest extends 
ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        
builder.clusterConfiguration(createClusterConfigWithCompactionProperties("10", 
"10"));
+    }
+
+    @Test
+    void testCompactionAfterRestartNode() throws Exception {
+        IgniteImpl node = aliveNode();
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(BAR_KEY, VALUE), 
willCompleteSuccessfully());
+
+        // Let's wait until the compaction on revision of FOO_KEY creation 
happens.
+        long fooRevision = latestKeyRevision(metaStorageManager, FOO_KEY);
+        assertTrue(waitForCondition(() -> 
metaStorageManager.getCompactionRevisionLocally() >= fooRevision, 10, 1_000));
+
+        log.info("Latest revision for key: [key={}, revision={}]", FOO_KEY, 
fooRevision);
+
+        // Let's cancel new compactions to create a new version for the key 
and not compact it until we restart the node.
+        startDropCompactionCommand(node);
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        long latestFooRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
+
+        long latestCompactionRevision = 
metaStorageManager.getCompactionRevisionLocally();
+        // Let's change the properties before restarting so that a new 
scheduled compaction does not start after the node starts.
+        changeCompactionProperties(node, Long.toString(Long.MAX_VALUE), 
Long.toString(Long.MAX_VALUE));
+
+        IgniteImpl restartedNode = restartNode();
+
+        MetaStorageManager restartedMetaStorageManager = 
restartedNode.metaStorageManager();
+
+        // Let's make sure that after the restart the correct revision of the 
compression is restored and the compression itself will be at

Review Comment:
   What is 'compression'?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.BAR_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainsSingleRevisionForKeyLocally;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
+import 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link MetaStorageCompactionTrigger} testing for single node case. */
+@WithSystemProperty(key = 
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value = 
"true")
+public class ItMetaStorageCompactionTriggerOneNodeTest extends 
ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        
builder.clusterConfiguration(createClusterConfigWithCompactionProperties("10", 
"10"));
+    }
+
+    @Test
+    void testCompactionAfterRestartNode() throws Exception {
+        IgniteImpl node = aliveNode();
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(BAR_KEY, VALUE), 
willCompleteSuccessfully());
+
+        // Let's wait until the compaction on revision of FOO_KEY creation 
happens.
+        long fooRevision = latestKeyRevision(metaStorageManager, FOO_KEY);
+        assertTrue(waitForCondition(() -> 
metaStorageManager.getCompactionRevisionLocally() >= fooRevision, 10, 1_000));
+
+        log.info("Latest revision for key: [key={}, revision={}]", FOO_KEY, 
fooRevision);
+
+        // Let's cancel new compactions to create a new version for the key 
and not compact it until we restart the node.
+        startDropCompactionCommand(node);
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        long latestFooRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
+
+        long latestCompactionRevision = 
metaStorageManager.getCompactionRevisionLocally();
+        // Let's change the properties before restarting so that a new 
scheduled compaction does not start after the node starts.
+        changeCompactionProperties(node, Long.toString(Long.MAX_VALUE), 
Long.toString(Long.MAX_VALUE));
+
+        IgniteImpl restartedNode = restartNode();
+
+        MetaStorageManager restartedMetaStorageManager = 
restartedNode.metaStorageManager();
+
+        // Let's make sure that after the restart the correct revision of the 
compression is restored and the compression itself will be at
+        // the latest compaction revision.
+        assertEquals(latestCompactionRevision, 
restartedMetaStorageManager.getCompactionRevisionLocally());
+        assertTrue(waitForCondition(() -> 
allNodesContainsSingleRevisionForKeyLocally(cluster, FOO_KEY, 
latestFooRevision), 10, 1_000));
+    }
+
+    private IgniteImpl aliveNode() {
+        return unwrapIgniteImpl(node(0));
+    }
+
+    private IgniteImpl restartNode() {
+        return unwrapIgniteImpl(restartNode(0));
+    }
+
+    private static void startDropCompactionCommand(IgniteImpl node) {
+        node.dropMessages((s, message) -> message instanceof WriteActionRequest
+                && ((WriteActionRequest) message).deserializedCommand() 
instanceof CompactionCommand);
+    }
+
+    private static void changeCompactionProperties(IgniteImpl node, String 
interval, String dataAvailabilityTime) {

Review Comment:
   Why are parameter types Strings and not longs?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java:
##########
@@ -319,4 +322,33 @@ private void cancelLastScheduledFutureBusy() {
             lock.unlock();
         }
     }
+
+    private void startCompactionOnRecoveryAsync() {

Review Comment:
   Why is it `Async`? It does not return a future, it just does a compaction in 
the background



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.BAR_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainsSingleRevisionForKeyLocally;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
+import 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link MetaStorageCompactionTrigger} testing for single node case. */
+@WithSystemProperty(key = 
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value = 
"true")
+public class ItMetaStorageCompactionTriggerOneNodeTest extends 
ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        
builder.clusterConfiguration(createClusterConfigWithCompactionProperties("10", 
"10"));
+    }
+
+    @Test
+    void testCompactionAfterRestartNode() throws Exception {
+        IgniteImpl node = aliveNode();
+
+        MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(BAR_KEY, VALUE), 
willCompleteSuccessfully());
+
+        // Let's wait until the compaction on revision of FOO_KEY creation 
happens.
+        long fooRevision = latestKeyRevision(metaStorageManager, FOO_KEY);
+        assertTrue(waitForCondition(() -> 
metaStorageManager.getCompactionRevisionLocally() >= fooRevision, 10, 1_000));
+
+        log.info("Latest revision for key: [key={}, revision={}]", FOO_KEY, 
fooRevision);
+
+        // Let's cancel new compactions to create a new version for the key 
and not compact it until we restart the node.
+        startDropCompactionCommand(node);
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        long latestFooRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
+
+        long latestCompactionRevision = 
metaStorageManager.getCompactionRevisionLocally();
+        // Let's change the properties before restarting so that a new 
scheduled compaction does not start after the node starts.
+        changeCompactionProperties(node, Long.toString(Long.MAX_VALUE), 
Long.toString(Long.MAX_VALUE));
+
+        IgniteImpl restartedNode = restartNode();
+
+        MetaStorageManager restartedMetaStorageManager = 
restartedNode.metaStorageManager();
+
+        // Let's make sure that after the restart the correct revision of the 
compression is restored and the compression itself will be at
+        // the latest compaction revision.
+        assertEquals(latestCompactionRevision, 
restartedMetaStorageManager.getCompactionRevisionLocally());
+        assertTrue(waitForCondition(() -> 
allNodesContainsSingleRevisionForKeyLocally(cluster, FOO_KEY, 
latestFooRevision), 10, 1_000));
+    }
+
+    private IgniteImpl aliveNode() {
+        return unwrapIgniteImpl(node(0));
+    }
+
+    private IgniteImpl restartNode() {
+        return unwrapIgniteImpl(restartNode(0));
+    }
+
+    private static void startDropCompactionCommand(IgniteImpl node) {

Review Comment:
   ```suggestion
       private static void startDroppingCompactionCommand(IgniteImpl node) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to