rpuch commented on code in PR #4465:
URL: https://github.com/apache/ignite-3/pull/4465#discussion_r1778381727


##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -272,4 +274,31 @@ public interface MetaStorageManager extends 
IgniteComponent {
 
     /** Unregisters a Meta Storage revision update listener. */
     void unregisterRevisionUpdateListener(RevisionUpdateListener listener);
+
+    /**
+     * Compacts outdated key versions and removes tombstones of metastorage 
locally.
+     *
+     * <p>We do not compact the only and last version of the key unless it is 
a tombstone.</p>
+     *
+     * <p>Let's look at some examples, let's say we have the following keys 
with their versions:</p>
+     * <ul>
+     *     <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo" 
[1, 3, 5].</li>
+     *     <li>Key "bar" with versions that have revisions (1, 2, 5) the last 
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+     * </ul>
+     *
+     * <p>Let's look at examples of invoking the current method and what will 
be in the storage after:</p>
+     * <ul>
+     *     <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5 
tomb].</li>
+     *     <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 5}: "foo" [5].</li>
+     *     <li>Compaction revision is {@code 6}: "foo" [5].</li>
+     * </ul>
+     *
+     * @param revision Revision up to which the metastorage keys will be 
compacted.

Review Comment:
   ```suggestion
        * @param revision Revision up to which (including) the metastorage keys 
will be compacted.
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java:
##########
@@ -17,186 +17,150 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** Compaction tests. */
 public abstract class AbstractCompactionKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
-    private final HybridClock clock = new HybridClockImpl();
-
-    @Test
-    public void testCompactionAfterLastRevision() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
+    private static final byte[] FOO_KEY = fromString("foo");
 
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
+    private static final byte[] BAR_KEY = fromString("bar");
 
-        long lastRevision = storage.revision();
+    private static final byte[] SOME_KEY = fromString("someKey");
 
-        storage.compact(clock.now());
+    private static final byte[] SOME_VALUE = fromString("someValue");
 
-        // Latest value, must exist.
-        Entry entry2 = storage.get(key, lastRevision);
-        assertEquals(lastRevision, entry2.revision());
-        assertArrayEquals(value2, entry2.value());
+    private final HybridClock clock = new HybridClockImpl();
 
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 1);
-        assertTrue(entry1.empty());
-    }
+    @Override
+    @BeforeEach
+    void setUp() {
+        super.setUp();
 
-    @Test
-    public void testCompactionAfterTombstone() {
-        byte[] key = key(0);
-        byte[] value = keyValue(0, 0);
+        storage.putAll(List.of(FOO_KEY, BAR_KEY), List.of(SOME_VALUE, 
SOME_VALUE), clock.now());
+        storage.put(BAR_KEY, SOME_VALUE, clock.now());
+        storage.put(FOO_KEY, SOME_VALUE, clock.now());
+        storage.put(SOME_KEY, SOME_VALUE, clock.now());
 
-        storage.put(key, value, clock.now());
-        storage.remove(key, clock.now());
+        var fooKey = new ByteArray(FOO_KEY);
+        var barKey = new ByteArray(BAR_KEY);
 
-        long lastRevision = storage.revision();
+        var iif = new If(
+                new AndCondition(new ExistenceCondition(Type.EXISTS, FOO_KEY), 
new ExistenceCondition(Type.EXISTS, BAR_KEY)),
+                new Statement(ops(put(fooKey, SOME_VALUE), 
remove(barKey)).yield()),
+                new Statement(ops(noop()).yield())

Review Comment:
   Is it possible to use the DSL instead of direct instantiation (using 
constructors)? Code using DSL seems to be more readable



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
+public class KeyValueStorageUtils {
+    /** Special value indicating that there are no key revisions that need to 
be compacted. */
+    public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+    /**
+     * Calculates the revision index in key revisions up to which compaction 
is needed, {@link #NOTHING_TO_COMPACT_INDEX} if nothing needs
+     * to be compacted.
+     *
+     * <p>If the returned index points to the last revision and if the last 
revision is <b>not</b> a tombstone, then the returned index is
+     * decremented by 1.</p>
+     *
+     * @param keyRevisions Metastorage key revisions in ascending order.
+     * @param compactionRevisionInclusive Revision up to which you need to 
compact (inclusive).
+     * @param tombstoneKeyRevisionPredicate Predicate to test whether a key 
revision is a tombstone.
+     */
+    public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate tombstoneKeyRevisionPredicate) {

Review Comment:
   ```suggestion
       public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate isTombstone) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
+public class KeyValueStorageUtils {
+    /** Special value indicating that there are no key revisions that need to 
be compacted. */
+    public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+    /**
+     * Calculates the revision index in key revisions up to which compaction 
is needed, {@link #NOTHING_TO_COMPACT_INDEX} if nothing needs
+     * to be compacted.
+     *
+     * <p>If the returned index points to the last revision and if the last 
revision is <b>not</b> a tombstone, then the returned index is
+     * decremented by 1.</p>
+     *
+     * @param keyRevisions Metastorage key revisions in ascending order.
+     * @param compactionRevisionInclusive Revision up to which you need to 
compact (inclusive).
+     * @param tombstoneKeyRevisionPredicate Predicate to test whether a key 
revision is a tombstone.
+     */
+    public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate tombstoneKeyRevisionPredicate) {
+        int i = indexToCompact(keyRevisions, compactionRevisionInclusive);
+
+        if (i != NOTHING_TO_COMPACT_INDEX && i == keyRevisions.length - 1 && 
!tombstoneKeyRevisionPredicate.test(keyRevisions[i])) {
+            i--;
+        }
+
+        return i;
+    }
+
+    private static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive) {
+        int index = NOTHING_TO_COMPACT_INDEX;
+
+        for (int i = 0; i < keyRevisions.length; i++) {

Review Comment:
   How about replacing this with binary search?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/** For {@link KeyValueStorageUtils} testing. */

Review Comment:
   Do we need this javadoc? There is a convention that `FooTest` is for testing 
`Foo`, so it seems obvious and redundant



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java:
##########
@@ -233,13 +234,31 @@ boolean invoke(
     void removeWatch(WatchListener listener);
 
     /**
-     * Compacts storage (removes tombstones).
+     * Compacts outdated key versions and removes tombstones of metastorage 
locally.
      *
-     * @param lowWatermark A time threshold for the entry. Only entries that 
have revisions with timestamp higher or equal to the
-     *     watermark can be removed.
+     * <p>We do not compact the only and last version of the key unless it is 
a tombstone.</p>
+     *
+     * <p>Let's look at some examples, let's say we have the following keys 
with their versions:</p>
+     * <ul>
+     *     <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo" 
[1, 3, 5].</li>
+     *     <li>Key "bar" with versions that have revisions (1, 2, 5) the last 
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+     * </ul>
+     *
+     * <p>Let's look at examples of invoking the current method and what will 
be in the storage after:</p>
+     * <ul>
+     *     <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5 
tomb].</li>
+     *     <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 5}: "foo" [5].</li>
+     *     <li>Compaction revision is {@code 6}: "foo" [5].</li>
+     * </ul>
+     *
+     * @param revision Revision up to which the metastorage keys will be 
compacted.

Review Comment:
   ```suggestion
        * @param revision Revision up to which (including) the metastorage keys 
will be compacted.
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java:
##########
@@ -17,186 +17,150 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** Compaction tests. */
 public abstract class AbstractCompactionKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
-    private final HybridClock clock = new HybridClockImpl();
-
-    @Test
-    public void testCompactionAfterLastRevision() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
+    private static final byte[] FOO_KEY = fromString("foo");
 
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
+    private static final byte[] BAR_KEY = fromString("bar");
 
-        long lastRevision = storage.revision();
+    private static final byte[] SOME_KEY = fromString("someKey");
 
-        storage.compact(clock.now());
+    private static final byte[] SOME_VALUE = fromString("someValue");
 
-        // Latest value, must exist.
-        Entry entry2 = storage.get(key, lastRevision);
-        assertEquals(lastRevision, entry2.revision());
-        assertArrayEquals(value2, entry2.value());
+    private final HybridClock clock = new HybridClockImpl();
 
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 1);
-        assertTrue(entry1.empty());
-    }
+    @Override
+    @BeforeEach
+    void setUp() {
+        super.setUp();
 
-    @Test
-    public void testCompactionAfterTombstone() {
-        byte[] key = key(0);
-        byte[] value = keyValue(0, 0);
+        storage.putAll(List.of(FOO_KEY, BAR_KEY), List.of(SOME_VALUE, 
SOME_VALUE), clock.now());
+        storage.put(BAR_KEY, SOME_VALUE, clock.now());
+        storage.put(FOO_KEY, SOME_VALUE, clock.now());
+        storage.put(SOME_KEY, SOME_VALUE, clock.now());
 
-        storage.put(key, value, clock.now());
-        storage.remove(key, clock.now());
+        var fooKey = new ByteArray(FOO_KEY);
+        var barKey = new ByteArray(BAR_KEY);
 
-        long lastRevision = storage.revision();
+        var iif = new If(
+                new AndCondition(new ExistenceCondition(Type.EXISTS, FOO_KEY), 
new ExistenceCondition(Type.EXISTS, BAR_KEY)),
+                new Statement(ops(put(fooKey, SOME_VALUE), 
remove(barKey)).yield()),
+                new Statement(ops(noop()).yield())
+        );
 
-        storage.compact(clock.now());
+        storage.invoke(iif, clock.now(), new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId());
 
-        // Current value, must be removed due to being a tombstone.
-        Entry entry2 = storage.get(key, lastRevision);
-        assertTrue(entry2.empty());
+        storage.remove(SOME_KEY, clock.now());
 
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 1);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(1, 3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(1, 2, 5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionBetweenMultipleWrites() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
-        byte[] value3 = keyValue(0, 2);
-        byte[] value4 = keyValue(0, 3);
-
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
-
-        HybridTimestamp compactTs = clock.now();
-
-        storage.put(key, value3, clock.now());
-        storage.put(key, value4, clock.now());
-
-        long lastRevision = storage.revision();
+    void testCompactRevision1() {
+        storage.compact(1);
 
-        storage.compact(compactTs);
-
-        Entry entry4 = storage.get(key, lastRevision);
-        assertArrayEquals(value4, entry4.value());
-
-        Entry entry3 = storage.get(key, lastRevision - 1);
-        assertArrayEquals(value3, entry3.value());
-
-        Entry entry2 = storage.get(key, lastRevision - 2);
-        assertArrayEquals(value2, entry2.value());
-
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 3);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(2, 5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionAfterTombstoneRemovesTombstone() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
-
-        storage.put(key, value1, clock.now());
-
-        storage.remove(key, clock.now());
-
-        HybridTimestamp compactTs = clock.now();
-
-        storage.put(key, value2, clock.now());
-
-        storage.remove(key, clock.now());
+    void testCompactRevision2() {
+        storage.compact(2);
 
-        long lastRevision = storage.revision();
-
-        storage.compact(compactTs);
-
-        // Last operation was remove, so this is a tombstone.
-        Entry entry4 = storage.get(key, lastRevision);
-        assertTrue(entry4.tombstone());
-
-        Entry entry3 = storage.get(key, lastRevision - 1);
-        assertArrayEquals(value2, entry3.value());
-
-        // Previous value, must be removed due to compaction.
-        Entry entry2 = storage.get(key, lastRevision - 2);
-        assertTrue(entry2.empty());
-
-        Entry entry1 = storage.get(key, lastRevision - 3);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactEmptyStorage() {
-        storage.compact(clock.now());
+    void testCompactRevision3() {
+        storage.compact(3);
+
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionBetweenRevisionsOfOneKey() {
-        byte[] key = key(0);
-        byte[] value11 = keyValue(0, 0);
-        byte[] value12 = keyValue(0, 1);
-
-        storage.put(key, value11, clock.now());
-
-        byte[] key2 = key(1);
-        byte[] value2 = keyValue(1, 0);
-        storage.put(key2, value2, clock.now());
+    void testCompactRevision4() {
+        storage.compact(4);
 
-        HybridTimestamp compactTs = clock.now();
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(6), collectRevisions(SOME_KEY));
+    }
 
-        storage.put(key, value12, clock.now());
+    @Test
+    void testCompactRevision5() {
+        storage.compact(5);
 
-        storage.compact(compactTs);
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(), collectRevisions(BAR_KEY));
+        assertEquals(List.of(6), collectRevisions(SOME_KEY));
+    }
 
-        // Both keys should exist, as low watermark's revision is higher than 
entry11's, but lesser than entry12's,
-        // this means that entry1 is still needed.
-        Entry entry12 = storage.get(key, storage.revision());
-        assertArrayEquals(value12, entry12.value());
+    @Test
+    void testCompactRevision6() {
+        storage.compact(6);
 
-        Entry entry11 = storage.get(key, storage.revision() - 1);
-        assertArrayEquals(value11, entry11.value());
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(), collectRevisions(BAR_KEY));
+        assertEquals(List.of(), collectRevisions(SOME_KEY));
+    }
 
-        Entry entry2 = storage.get(key2, storage.revision());
-        assertArrayEquals(value2, entry2.value());
+    @Test
+    void testCompactRevisionSequentially() {
+        testCompactRevision1();
+        testCompactRevision2();
+        testCompactRevision3();
+        testCompactRevision4();
+        testCompactRevision5();
+        testCompactRevision6();
     }
 
     @Test
-    public void testInvokeCompactionBeforeAnyEntry() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
+    public void testCompactEmptyStorage() {

Review Comment:
   Why is it about an empty storage as the storage seems to contain something 
(namely the data added in the 'before each' method)



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -547,78 +557,42 @@ private boolean doRemove(byte[] key, long curRev) {
     }
 
     /**
-     * Compacts all entries by the given key, removing revision that are no 
longer needed.
-     * Last entry with a revision lesser or equal to the {@code 
minRevisionToKeep} and all consecutive entries will be preserved.
-     * If the first entry to keep is a tombstone, it will be removed.
+     * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
-     * @param key A key.
-     * @param revs All revisions of a key.
-     * @param compactedKeysIdx Out parameter, revisions that need to be kept 
must be put here.
-     * @param compactedRevsIdx Out parameter, values that need to be kept must 
be put here.
-     * @param minRevisionToKeep Minimum revision that should be kept.
+     * @param key Target key.
+     * @param revs Key revisions.
+     * @param compactionRevision Revision up to which the key will be 
compacted.

Review Comment:
   ```suggestion
        * @param compactionRevision Revision up to which (inclusively) the key 
will be compacted.
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1026,104 +1027,41 @@ private boolean addToBatchForRemoval(WriteBatch batch, 
byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing revision that are no 
longer needed.
-     * Last entry with a revision lesser or equal to the {@code 
minRevisionToKeep} and all consecutive entries will be preserved.
-     * If the first entry to keep is a tombstone, it will be removed.
-     * Example:
-     * <pre>
-     * Example 1:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 6
-     *
-     *     entry1: exists
-     *     entry2: exists
-     *
-     * Example 2:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 7
-     *
-     *     entry1: doesn't exist
-     *     entry2: exists
-     * </pre>
+     * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
      * @param batch Write batch.
-     * @param key   Target key.
-     * @param revs  Revisions.
-     * @param minRevisionToKeep Minimum revision that should be kept.
-     * @throws RocksDBException If failed.
+     * @param key Target key.
+     * @param revs Key revisions.
+     * @param compactionRevision Revision up to which the key will be 
compacted.

Review Comment:
   ```suggestion
        * @param compactionRevision Revision up to which (inclusively) the key 
will be compacted.
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1026,104 +1027,41 @@ private boolean addToBatchForRemoval(WriteBatch batch, 
byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing revision that are no 
longer needed.
-     * Last entry with a revision lesser or equal to the {@code 
minRevisionToKeep} and all consecutive entries will be preserved.
-     * If the first entry to keep is a tombstone, it will be removed.
-     * Example:
-     * <pre>
-     * Example 1:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 6
-     *
-     *     entry1: exists
-     *     entry2: exists
-     *
-     * Example 2:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 7
-     *
-     *     entry1: doesn't exist
-     *     entry2: exists
-     * </pre>
+     * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
      * @param batch Write batch.
-     * @param key   Target key.
-     * @param revs  Revisions.
-     * @param minRevisionToKeep Minimum revision that should be kept.
-     * @throws RocksDBException If failed.
+     * @param key Target key.
+     * @param revs Key revisions.
+     * @param compactionRevision Revision up to which the key will be 
compacted.
+     * @throws MetaStorageException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long 
minRevisionToKeep) throws RocksDBException {
-        if (revs.length < 2) {
-            // If we have less than two revisions, there is no point in 
compaction.
-            return;
-        }
-
-        // Index of the first revision we will be keeping in the array of 
revisions.
-        int idxToKeepFrom = 0;
-
-        // Whether there is an entry with the minRevisionToKeep.
-        boolean hasMinRevision = false;
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long 
compactionRevision) {
+        try {
+            int indexToCompact = indexToCompact(revs, compactionRevision, 
revision -> isTombstoneForCompaction(key, revision));
 
-        // Traverse revisions, looking for the first revision that needs to be 
kept.
-        for (long rev : revs) {
-            if (rev >= minRevisionToKeep) {
-                if (rev == minRevisionToKeep) {
-                    hasMinRevision = true;
-                }
-                break;
+            if (NOTHING_TO_COMPACT_INDEX == indexToCompact) {
+                return;
             }
 
-            idxToKeepFrom++;
-        }
-
-        if (!hasMinRevision) {
-            // Minimal revision was not encountered, that mean that we are 
between revisions of a key, so previous revision
-            // must be preserved.
-            idxToKeepFrom--;
-        }
-
-        if (idxToKeepFrom <= 0) {
-            // All revisions are still in use.
-            return;
-        }
-
-        for (int i = 0; i < idxToKeepFrom; i++) {
-            // This revision is not needed anymore, remove data.
-            data.delete(batch, keyToRocksKey(revs[i], key));
-        }
-
-        // Whether we only have last revision (even if it's lesser or equal to 
watermark).
-        boolean onlyLastRevisionLeft = idxToKeepFrom == (revs.length - 1);
-
-        // Get the number of the first revision that will be kept.
-        long rev = onlyLastRevisionLeft ? lastRevision(revs) : 
revs[idxToKeepFrom];
-
-        byte[] rocksKey = keyToRocksKey(rev, key);
-
-        Value value = bytesToValue(data.get(rocksKey));
-
-        if (value.tombstone()) {
-            // The first revision we are going to keep is a tombstone, we may 
delete it.
-            data.delete(batch, rocksKey);
-
-            if (!onlyLastRevisionLeft) {
-                // First revision was a tombstone, but there are other 
revisions, that need to be kept,
-                // so advance index of the first revision we need to keep.
-                idxToKeepFrom++;
+            for (int revisionIndex = 0; revisionIndex <= indexToCompact; 
revisionIndex++) {
+                // This revision is not needed anymore, remove data.
+                data.delete(batch, keyToRocksKey(revs[revisionIndex], key));
             }
-        }
 
-        if (onlyLastRevisionLeft && value.tombstone()) {
-            // We don't have any previous revisions for this entry and the 
single existing is a tombstone,
-            // so we can remove it from index.
-            index.delete(batch, key);
-        } else {
-            // Keeps revisions starting with idxToKeepFrom.
-            index.put(batch, key, longsToBytes(revs, idxToKeepFrom));
+            if (indexToCompact == revs.length - 1) {
+                index.delete(batch, key);

Review Comment:
   A compacted revision gets removed from `data` and `index` column families, 
but there are more. For instance, there are mappings from revisions to 
timestamps and from timestamps to revisions. Should we remove the revision from 
them as   well?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/** For {@link KeyValueStorageUtils} testing. */
+public class KeyValueStorageUtilsTest {
+    @Test
+    void testIndexToCompactNoRevisions() {
+        assertEquals(NOTHING_TO_COMPACT_INDEX, 
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> false));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, 
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> true));
+    }
+
+    @Test
+    void testIndexToCompactSingleRevision() {
+        long[] keyRevisions = {2};
+
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> false));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> true));
+
+        assertEquals(0, indexToCompact(keyRevisions, 2, revision -> true));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 2, 
revision -> false));
+
+        assertEquals(0, indexToCompact(keyRevisions, 3, revision -> true));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 3, 
revision -> false));
+    }
+
+    @Test
+    void testIndexToCompactMultipleRevision() {

Review Comment:
   ```suggestion
       void testIndexToCompactMultipleRevisions() {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
+public class KeyValueStorageUtils {
+    /** Special value indicating that there are no key revisions that need to 
be compacted. */
+    public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+    /**
+     * Calculates the revision index in key revisions up to which compaction 
is needed, {@link #NOTHING_TO_COMPACT_INDEX} if nothing needs
+     * to be compacted.
+     *
+     * <p>If the returned index points to the last revision and if the last 
revision is <b>not</b> a tombstone, then the returned index is
+     * decremented by 1.</p>
+     *
+     * @param keyRevisions Metastorage key revisions in ascending order.
+     * @param compactionRevisionInclusive Revision up to which you need to 
compact (inclusive).
+     * @param tombstoneKeyRevisionPredicate Predicate to test whether a key 
revision is a tombstone.
+     */
+    public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate tombstoneKeyRevisionPredicate) {
+        int i = indexToCompact(keyRevisions, compactionRevisionInclusive);
+
+        if (i != NOTHING_TO_COMPACT_INDEX && i == keyRevisions.length - 1 && 
!tombstoneKeyRevisionPredicate.test(keyRevisions[i])) {
+            i--;
+        }
+
+        return i;

Review Comment:
   This might return `-1`, but this is not obvious from the javadoc. Please 
indicate this and explain what a negative return value means.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
+public class KeyValueStorageUtils {
+    /** Special value indicating that there are no key revisions that need to 
be compacted. */
+    public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+    /**
+     * Calculates the revision index in key revisions up to which compaction 
is needed, {@link #NOTHING_TO_COMPACT_INDEX} if nothing needs
+     * to be compacted.
+     *
+     * <p>If the returned index points to the last revision and if the last 
revision is <b>not</b> a tombstone, then the returned index is
+     * decremented by 1.</p>
+     *
+     * @param keyRevisions Metastorage key revisions in ascending order.
+     * @param compactionRevisionInclusive Revision up to which you need to 
compact (inclusive).
+     * @param tombstoneKeyRevisionPredicate Predicate to test whether a key 
revision is a tombstone.
+     */
+    public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate tombstoneKeyRevisionPredicate) {
+        int i = indexToCompact(keyRevisions, compactionRevisionInclusive);
+
+        if (i != NOTHING_TO_COMPACT_INDEX && i == keyRevisions.length - 1 && 
!tombstoneKeyRevisionPredicate.test(keyRevisions[i])) {
+            i--;

Review Comment:
   If we the method was given `keyRevisions` consisting of just 1 element, and 
`i` is 0, and it's not a tombstone, this will make `i` be -1. Will this be 
valid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to