tkalkirill commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1231976667


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void 
readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link 
MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());

Review Comment:
   Are you sure that you need to remove these checks, you have deprecated this 
method for now, let it be tested for now.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void 
readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link 
MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());
 
             storage.lastApplied(1, 1);
 
             assertEquals(1, storage.lastAppliedIndex());
             assertEquals(1, storage.lastAppliedTerm());
-            assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));
 
             return null;
         });
 
         CompletableFuture<Void> flushFuture = storage.flush();
 
         assertThat(flushFuture, willCompleteSuccessfully());
-
-        assertEquals(1, storage.persistedIndex());

Review Comment:
   Same



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java:
##########
@@ -521,7 +521,6 @@ protected static void checkLastApplied(
             long expLastAppliedTerm
     ) {
         assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
-        assertEquals(expPersistentIndex, storage.persistedIndex());

Review Comment:
   same



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -1054,7 +1053,6 @@ private static void checkLastApplied(
             long expLastAppliedTerm
     ) {
         assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
-        assertEquals(expPersistentIndex, storage.persistedIndex());

Review Comment:
   Same



##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java:
##########
@@ -151,7 +150,9 @@ void testRestart() {
 
         tableStorage.start();
 
-        assertThat(tableStorage.getMvPartition(PARTITION_ID), 
is(notNullValue()));
+        assertThat(tableStorage.getMvPartition(PARTITION_ID), is(nullValue()));
+
+        tableStorage.createMvPartition(PARTITION_ID);

Review Comment:
   The method returns the future, please check if it succeeds.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -34,7 +34,7 @@
  * Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact 
that every partition already stores its latest applied index
  * and thus can itself be used as its own snapshot.
  *
- * <p>Uses {@link MvPartitionStorage#persistedIndex()} and configuration, 
passed into constructor, to create a {@link SnapshotMeta} object

Review Comment:
   Same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -70,6 +69,8 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     private final Map<String, RocksDbDataRegion> regions = new 
ConcurrentHashMap<>();
 
+    private final Map<String, SharedRocksDbInstance> sharedInstances = new 
ConcurrentHashMap<>();

Review Comment:
   What is the key? name or something else? it would be more convenient to 
reflect in the variable name or documentation.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -170,14 +173,18 @@ public RocksDbTableStorage createMvTable(
 
         assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + 
tableDescriptor.getDataRegion();
 
-        Path tablePath = storagePath.resolve(TABLE_DIR_PREFIX + tableId);
-
-        try {
-            Files.createDirectories(tablePath);
-        } catch (IOException e) {
-            throw new StorageException("Failed to create table store directory 
for table: " + tableId, e);
-        }
+        SharedRocksDbInstance sharedInstance = 
sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {

Review Comment:
   Do I understand correctly that now for tables with a rocsDB it will not be 
so easy to change the data region, maybe you need to tell about it somewhere in 
the documentation?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+

Review Comment:
   ```suggestion
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new 
WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for 
meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, 
identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> 
sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily 
columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + 
path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it 
doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int 
indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, 
name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);

Review Comment:
   Perhaps we need to create a new set to avoid data races.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java:
##########
@@ -38,6 +38,9 @@ public class RocksDbStorageUtils {
     /** Index ID size in bytes. */
     public static final int INDEX_ID_SIZE = Integer.SIZE;
 
+    /** Table ID size. */

Review Comment:
   ```suggestion
       /** Table ID size in bytes. */
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -127,7 +126,11 @@ interface Locker {
 
     /**
      * {@link #lastAppliedIndex()} value consistent with the data, already 
persisted on the storage.
+     *
+     * @deprecated No one needs it, and it slows down the storage.
      */
+    //TODO IGNITE-19750 Delete this method.

Review Comment:
   ```suggestion
       // TODO: IGNITE-19750 Delete this method.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -145,7 +146,6 @@ public interface TxStateStorage extends ManuallyCloseable {
      *         <li>{@link TxStateStorage#put(UUID, TxMeta)};</li>
      *         <li>{@link TxStateStorage#lastAppliedIndex()};</li>
      *         <li>{@link TxStateStorage#lastAppliedTerm()}} ()};</li>
-     *         <li>{@link TxStateStorage#persistedIndex()}};</li>

Review Comment:
   same



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void 
readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link 
MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be 
successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());
 
             storage.lastApplied(1, 1);
 
             assertEquals(1, storage.lastAppliedIndex());
             assertEquals(1, storage.lastAppliedTerm());
-            assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));

Review Comment:
   Same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2372,8 +2372,8 @@ private CompletableFuture<PartitionStorages> 
getOrCreatePartitionStorages(TableI
                 .thenComposeAsync(mvPartitionStorage -> {
                     TxStateStorage txStateStorage = 
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
 
-                    if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS

Review Comment:
   same



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -673,10 +673,10 @@ private void 
testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorage
 
         if (isTxStorageUnderRebalance) {
             // Emulate a situation when TX state storage was stopped in a 
middle of rebalance.
-            
when(txStateStorage.persistedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);

Review Comment:
   same



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -201,7 +201,7 @@ public interface TxStateStorage extends ManuallyCloseable {
      *     <li>Cancels all current operations (including cursors) with storage 
and waits for their completion;</li>
      *     <li>Does not allow operations to be performed (exceptions will be 
thrown) with the storage until the cleaning is completed;</li>
      *     <li>Clears storage;</li>
-     *     <li>Sets the {@link #lastAppliedIndex()}, {@link 
#lastAppliedTerm()} and {@link #persistedIndex()} to {@code 0};</li>

Review Comment:
   same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2372,8 +2372,8 @@ private CompletableFuture<PartitionStorages> 
getOrCreatePartitionStorages(TableI
                 .thenComposeAsync(mvPartitionStorage -> {
                     TxStateStorage txStateStorage = 
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
 
-                    if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS
-                            || txStateStorage.persistedIndex() == 
TxStateStorage.REBALANCE_IN_PROGRESS) {

Review Comment:
   same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a 
UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the 
corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> 
columns) {

Review Comment:
   Please add unit tests for this



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java:
##########
@@ -64,6 +64,7 @@ public StorageSortedIndexColumnDescriptor(String name, 
NativeType type, boolean
         }
 
         @Override
+        @Deprecated

Review Comment:
   Maybe add TODO or something like that?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a 
UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the 
corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> 
columns) {
+        ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() + 
columns.size() * 2);
+
+        buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8));
+
+        for (StorageSortedIndexColumnDescriptor column : columns) {
+            NativeType nativeType = column.type();
+            NativeTypeSpec nativeTypeSpec = nativeType.spec();
+
+            buf.put((byte) nativeTypeSpec.ordinal());
+
+            int flags = 0;
+
+            if (column.nullable()) {
+                flags |= NULLABILITY_FLAG;
+            }
+
+            if (column.asc()) {
+                flags |= ASC_ORDER_FLAG;
+            }
+
+            buf.put((byte) flags);
+        }
+
+        return buf.array();
     }
 
     /**
-     * Extracts a Sorted Index ID from the given Column Family name.
-     *
-     * @param cfName Column Family name.
-     *
-     * @see #sortedIndexCfName
+     * Creates an {@link org.rocksdb.AbstractComparator} instance to compare 
keys in column family with name {@code cfName}.
+     * Please refer to {@link #sortedIndexCfName(List)} for the details of the 
CF name encoding.
      */
-    static int sortedIndexId(String cfName) {
-        return 
Integer.parseInt(cfName.substring(SORTED_INDEX_CF_PREFIX.length()));
+    public static RocksDbBinaryTupleComparator comparatorFromCfName(byte[] 
cfName) {

Review Comment:
   It would be nice to do unit tests for this method.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -673,10 +673,10 @@ private void 
testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorage
 
         if (isTxStorageUnderRebalance) {
             // Emulate a situation when TX state storage was stopped in a 
middle of rebalance.
-            
when(txStateStorage.persistedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);
+            
when(txStateStorage.lastAppliedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);
         } else {
             // Emulate a situation when partition storage was stopped in a 
middle of rebalance.
-            
when(mvPartitionStorage.persistedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);

Review Comment:
   same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new 
WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for 
meta column family.. */

Review Comment:
   ```suggestion
       /** Meta information instance that wraps {@link ColumnFamily} instance 
for meta column family. */
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java:
##########
@@ -17,110 +17,57 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.ByteOrder.BIG_ENDIAN;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.getRowIdUuid;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putIndexId;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putRowIdUuid;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
 import java.nio.ByteBuffer;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.AbstractWriteBatch;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
 
 /**
  * Wrapper around the "meta" Column Family inside a RocksDB-based storage, 
which stores some auxiliary information needed for internal
  * storage logic.
  */
 public class RocksDbMetaStorage {
-    /** Name of the key that corresponds to a list of existing partition IDs 
of a storage. */
-    private static final byte[] PARTITION_ID_PREFIX = "part".getBytes(UTF_8);
-
-    /** Index meta key prefix. */
-    private static final byte[] INDEX_META_KEY_PREFIX = 
"index-meta".getBytes(UTF_8);
-
-    /** Index meta key size in bytes. */
-    private static final int INDEX_META_KEY_SIZE = 
INDEX_META_KEY_PREFIX.length + PARTITION_ID_SIZE + ROW_ID_SIZE;
-
-    /** Name of the key that is out of range of the partition ID key prefix, 
used as an exclusive bound. */
-    private static final byte[] PARTITION_ID_PREFIX_END = 
RocksUtils.incrementPrefix(PARTITION_ID_PREFIX);
+    public static final byte[] PARTITION_META_PREFIX = {0};

Review Comment:
   Missing javadoc for constants



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new 
WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for 
meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, 
identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> 
sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily 
columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + 
path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it 
doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int 
indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, 
name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {

Review Comment:
   Perhaps we need to create a new set to avoid data races.
   
   



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new 
WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for 
meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, 
identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> 
sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily 
columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + 
path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it 
doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int 
indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, 
name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {
+                    indexIds = null;
+
+                    destroyColumnFamily(name);
+                }
+
+                return indexIds;
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray 
name) {
+        return sortedIndexCfs.computeIfAbsent(name, unused -> {
+            ColumnFamilyDescriptor cfDescriptor = new 
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
+
+            ColumnFamily columnFamily;
+            try {
+                columnFamily = ColumnFamily.create(db, cfDescriptor);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to create new RocksDB 
column family: " + new String(cfDescriptor.getName(), UTF_8), e);

Review Comment:
   Maybe create a helper common method to get the name from 
**ColumnFamilyDescriptor**?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to