rpuch commented on code in PR #5465:
URL: https://github.com/apache/ignite-3/pull/5465#discussion_r2007020780


##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java:
##########
@@ -253,19 +258,35 @@ static int createZone(Node node, String zoneName, int 
partitions, int replicas)
     }
 
     private static int createZone(Node node, String zoneName, int partitions, 
int replicas, boolean testStorageProfile) {
+        return createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                partitions,
+                replicas,
+                testStorageProfile
+                        ? DEFAULT_TEST_PROFILE_NAME
+                        : DEFAULT_STORAGE_PROFILE
+        );
+    }
+
+    static int createZoneWithStorageProfiles(Node node, String zoneName, int 
partitions, int replicas, String... profiles) {
         DistributionZonesTestUtil.createZoneWithStorageProfile(
                 node.catalogManager,
                 zoneName,
                 partitions,
                 replicas,
-                testStorageProfile ? DEFAULT_TEST_PROFILE_NAME : 
DEFAULT_STORAGE_PROFILE
+                String.join(",", profiles)
         );
 
         Integer zoneId = getZoneId(node.catalogManager, zoneName, 
node.hybridClock.nowLong());
         return requireNonNull(zoneId, "No zone found with name " + zoneName);
     }
 
     static void createTable(Node node, String zoneName, String tableName) {
+        createTable(node, zoneName, tableName, null);
+    }
+
+    static void createTable(Node node, String zoneName, String tableName, 
String storageProfile) {

Review Comment:
   ```suggestion
       static void createTable(Node node, String zoneName, String tableName, 
@Nullable String storageProfile) {
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);

Review Comment:
   Let's obtain the table using public API. The less we use internal APIs in 
our tests, the better



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");

Review Comment:
   You could just use JraftServerImpl#raftGroupService() to get 
RaftGroupService, there is no need to resort to reflection here



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java:
##########
@@ -99,7 +100,11 @@ abstract class ItAbstractColocationTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private NodeAttributesConfiguration defaultNodeAttributesConfiguration;
 
-    @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + 
".engine = aipersist, test.engine=test}")
+    @InjectConfiguration("mock.profiles = {"
+            + DEFAULT_STORAGE_PROFILE + ".engine = aipersist, "
+            + DEFAULT_TEST_PROFILE_NAME + ".engine=test, "
+            + DEFAULT_AIMEM_PROFILE_NAME + ".engine=aimem"

Review Comment:
   Spaces around '=' do not look consistent



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(2, zoneProfiles.size());
+
+        boolean volatileEngineIsPresented = false;
+        boolean persistentEngineIsPresented = false;
+
+        for (CatalogStorageProfileDescriptor profile : zoneProfiles) {
+            StorageEngine engine = 
node.dataStorageManager().engineByStorageProfile(profile.storageProfile());
+
+            assertNotNull(engine);
+
+            if (engine.isVolatile()) {
+                volatileEngineIsPresented = true;
+            } else {
+                persistentEngineIsPresented = true;
+            }
+        }
+
+        assertTrue(volatileEngineIsPresented);
+        assertTrue(persistentEngineIsPresented);
+    }
+
+    private static void checkZoneConsistOfVolatileOnlyStorageProfile(Node 
node, int zoneId) {

Review Comment:
   ```suggestion
       private static void checkZoneConsistsOfVolatileOnlyStorageProfiles(Node 
node, int zoneId) {
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(2, zoneProfiles.size());
+
+        boolean volatileEngineIsPresented = false;

Review Comment:
   ```suggestion
           boolean volatileEngineIsPresent = false;
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(2, zoneProfiles.size());
+
+        boolean volatileEngineIsPresented = false;
+        boolean persistentEngineIsPresented = false;
+
+        for (CatalogStorageProfileDescriptor profile : zoneProfiles) {
+            StorageEngine engine = 
node.dataStorageManager().engineByStorageProfile(profile.storageProfile());
+
+            assertNotNull(engine);
+
+            if (engine.isVolatile()) {
+                volatileEngineIsPresented = true;
+            } else {
+                persistentEngineIsPresented = true;
+            }
+        }
+
+        assertTrue(volatileEngineIsPresented);
+        assertTrue(persistentEngineIsPresented);
+    }
+
+    private static void checkZoneConsistOfVolatileOnlyStorageProfile(Node 
node, int zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(1, zoneProfiles.size());
+
+        StorageEngine volatileEngine = 
node.dataStorageManager().engineByStorageProfile(zoneProfiles.get(0).storageProfile());
+
+        assertNotNull(volatileEngine);
+
+        assertTrue(volatileEngine.isVolatile());
+    }

Review Comment:
   Let's add tests making sure that in a situation when we get an unknown 
storage profile or engine, we still start a persistent raft, even though all 
the known profiles are volatile



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {

Review Comment:
   ```suggestion
       private static void checkZoneConsistsOfMixedStorageProfiles(Node node, 
int zoneId) {
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME

Review Comment:
   How about passing more than 1 volatile profile?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";

Review Comment:
   It's not mixed actually



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");

Review Comment:
   RaftGroupService#getRaftNode() should be used instead



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(2, zoneProfiles.size());
+
+        boolean volatileEngineIsPresented = false;
+        boolean persistentEngineIsPresented = false;

Review Comment:
   ```suggestion
           boolean persistentEngineIsPresent = false;
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");

Review Comment:
   Please add a `@TestOnly` getter instead. Reflection makes our tests too 
fragile



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -251,6 +256,8 @@ public class PartitionReplicaLifecycleManager extends
      * @param sharedTxStateStorage Shared tx state storage.
      * @param txManager Transaction manager.
      * @param schemaManager Schema manager.
+     * @param dataStorageManager Data storage manager for checking zones' 
storage profiles for volatile flag.

Review Comment:
   Let's not mention why we need it. This might easily be changed, and the 
javadoc will not be updated, so it will become stale



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.TestDefaultProfilesNames;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Set of tests that are checking how in-memory scenarios are working under 
zone colocation.
+ */
+public class ItZoneInMemoryTest extends ItAbstractColocationTest {
+    @Test
+    void testMixedStorageProfilesZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfMixedStorageProfiles(node, zoneId);
+
+        assertFalse(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    @Test
+    void testVolatileStorageProfileZone() throws Exception {
+        startCluster(1);
+
+        Node node = getNode(0);
+
+        String zoneName = "mixed_zone";
+        int zoneId = createZoneWithStorageProfiles(
+                node,
+                zoneName,
+                1,
+                1,
+                TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME
+        );
+
+        checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId);
+
+        assertTrue(isRaftLogStorageVolatile(node, zoneId));
+
+        checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName);
+    }
+
+    private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node 
node, String zoneName) throws NodeStoppingException {
+        String tableName = "volatile_table";
+
+        createTable(node, zoneName, tableName, 
TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        checkTableStorageIsVolatile(node, tableId);
+
+        KeyValueView<Long, Integer> kvView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+        Map<Long, Integer> values = Map.of(
+                0L, 0,
+                1L, 1,
+                2L, 2
+        );
+
+        node.transactions().runInTransaction(tx -> {
+            assertDoesNotThrow(() -> kvView.putAll(tx, values));
+        });
+
+        node.transactions().runInTransaction(tx -> {
+            assertEquals(values.size(), assertDoesNotThrow(() -> 
kvView.getAll(tx, Set.of(0L, 1L, 2L))).size());
+        });
+    }
+
+    private static void checkTableStorageIsVolatile(Node node, int tableId) 
throws NodeStoppingException {
+        InternalTableImpl internalTable = (InternalTableImpl) 
node.tableManager.table(tableId).internalTable();
+
+        assertInstanceOf(VolatilePageMemoryTableStorage.class, 
internalTable.storage());
+    }
+
+    private static List<CatalogStorageProfileDescriptor> 
extractZoneProfiles(Node node, int zoneId) {
+        CatalogZoneDescriptor zoneDescriptor = 
node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId);
+
+        assertNotNull(zoneDescriptor);
+
+        return zoneDescriptor.storageProfiles().profiles();
+    }
+
+    private static boolean isRaftLogStorageVolatile(Node node, int zoneId) {
+        RaftServer raftServer = node.raftManager.server();
+
+        ConcurrentMap<RaftNodeId, RaftGroupService> nodes = 
getFieldValue(raftServer, JraftServerImpl.class, "nodes");
+
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        String singlePeerConsistentId = 
node.clusterService.topologyService().localMember().name();
+
+        RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new 
Peer(singlePeerConsistentId));
+
+        RaftGroupService raftService = nodes.get(raftNodeId);
+
+        NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, 
"node");
+
+        LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, 
"logStorage");
+
+        return raftLogStorage instanceof VolatileLogStorage;
+    }
+
+    private static void checkZoneConsistOfMixedStorageProfiles(Node node, int 
zoneId) {
+        List<CatalogStorageProfileDescriptor> zoneProfiles = 
extractZoneProfiles(node, zoneId);
+
+        assertEquals(2, zoneProfiles.size());
+
+        boolean volatileEngineIsPresented = false;
+        boolean persistentEngineIsPresented = false;
+
+        for (CatalogStorageProfileDescriptor profile : zoneProfiles) {
+            StorageEngine engine = 
node.dataStorageManager().engineByStorageProfile(profile.storageProfile());
+
+            assertNotNull(engine);
+
+            if (engine.isVolatile()) {
+                volatileEngineIsPresented = true;
+            } else {
+                persistentEngineIsPresented = true;
+            }
+        }
+
+        assertTrue(volatileEngineIsPresented);
+        assertTrue(persistentEngineIsPresented);

Review Comment:
   Please add a message including the list of profiles to the assertions to 
facilitate debugging a failed assertion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to