rpuch commented on code in PR #5465: URL: https://github.com/apache/ignite-3/pull/5465#discussion_r2007020780
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java: ########## @@ -253,19 +258,35 @@ static int createZone(Node node, String zoneName, int partitions, int replicas) } private static int createZone(Node node, String zoneName, int partitions, int replicas, boolean testStorageProfile) { + return createZoneWithStorageProfiles( + node, + zoneName, + partitions, + replicas, + testStorageProfile + ? DEFAULT_TEST_PROFILE_NAME + : DEFAULT_STORAGE_PROFILE + ); + } + + static int createZoneWithStorageProfiles(Node node, String zoneName, int partitions, int replicas, String... profiles) { DistributionZonesTestUtil.createZoneWithStorageProfile( node.catalogManager, zoneName, partitions, replicas, - testStorageProfile ? DEFAULT_TEST_PROFILE_NAME : DEFAULT_STORAGE_PROFILE + String.join(",", profiles) ); Integer zoneId = getZoneId(node.catalogManager, zoneName, node.hybridClock.nowLong()); return requireNonNull(zoneId, "No zone found with name " + zoneName); } static void createTable(Node node, String zoneName, String tableName) { + createTable(node, zoneName, tableName, null); + } + + static void createTable(Node node, String zoneName, String tableName, String storageProfile) { Review Comment: ```suggestion static void createTable(Node node, String zoneName, String tableName, @Nullable String storageProfile) { ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); Review Comment: Let's obtain the table using public API. The less we use internal APIs in our tests, the better ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); Review Comment: You could just use JraftServerImpl#raftGroupService() to get RaftGroupService, there is no need to resort to reflection here ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java: ########## @@ -99,7 +100,11 @@ abstract class ItAbstractColocationTest extends IgniteAbstractTest { @InjectConfiguration private NodeAttributesConfiguration defaultNodeAttributesConfiguration; - @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + ".engine = aipersist, test.engine=test}") + @InjectConfiguration("mock.profiles = {" + + DEFAULT_STORAGE_PROFILE + ".engine = aipersist, " + + DEFAULT_TEST_PROFILE_NAME + ".engine=test, " + + DEFAULT_AIMEM_PROFILE_NAME + ".engine=aimem" Review Comment: Spaces around '=' do not look consistent ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(2, zoneProfiles.size()); + + boolean volatileEngineIsPresented = false; + boolean persistentEngineIsPresented = false; + + for (CatalogStorageProfileDescriptor profile : zoneProfiles) { + StorageEngine engine = node.dataStorageManager().engineByStorageProfile(profile.storageProfile()); + + assertNotNull(engine); + + if (engine.isVolatile()) { + volatileEngineIsPresented = true; + } else { + persistentEngineIsPresented = true; + } + } + + assertTrue(volatileEngineIsPresented); + assertTrue(persistentEngineIsPresented); + } + + private static void checkZoneConsistOfVolatileOnlyStorageProfile(Node node, int zoneId) { Review Comment: ```suggestion private static void checkZoneConsistsOfVolatileOnlyStorageProfiles(Node node, int zoneId) { ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(2, zoneProfiles.size()); + + boolean volatileEngineIsPresented = false; Review Comment: ```suggestion boolean volatileEngineIsPresent = false; ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(2, zoneProfiles.size()); + + boolean volatileEngineIsPresented = false; + boolean persistentEngineIsPresented = false; + + for (CatalogStorageProfileDescriptor profile : zoneProfiles) { + StorageEngine engine = node.dataStorageManager().engineByStorageProfile(profile.storageProfile()); + + assertNotNull(engine); + + if (engine.isVolatile()) { + volatileEngineIsPresented = true; + } else { + persistentEngineIsPresented = true; + } + } + + assertTrue(volatileEngineIsPresented); + assertTrue(persistentEngineIsPresented); + } + + private static void checkZoneConsistOfVolatileOnlyStorageProfile(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(1, zoneProfiles.size()); + + StorageEngine volatileEngine = node.dataStorageManager().engineByStorageProfile(zoneProfiles.get(0).storageProfile()); + + assertNotNull(volatileEngine); + + assertTrue(volatileEngine.isVolatile()); + } Review Comment: Let's add tests making sure that in a situation when we get an unknown storage profile or engine, we still start a persistent raft, even though all the known profiles are volatile ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { Review Comment: ```suggestion private static void checkZoneConsistsOfMixedStorageProfiles(Node node, int zoneId) { ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME Review Comment: How about passing more than 1 volatile profile? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; Review Comment: It's not mixed actually ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); Review Comment: RaftGroupService#getRaftNode() should be used instead ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(2, zoneProfiles.size()); + + boolean volatileEngineIsPresented = false; + boolean persistentEngineIsPresented = false; Review Comment: ```suggestion boolean persistentEngineIsPresent = false; ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); Review Comment: Please add a `@TestOnly` getter instead. Reflection makes our tests too fragile ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -251,6 +256,8 @@ public class PartitionReplicaLifecycleManager extends * @param sharedTxStateStorage Shared tx state storage. * @param txManager Transaction manager. * @param schemaManager Schema manager. + * @param dataStorageManager Data storage manager for checking zones' storage profiles for volatile flag. Review Comment: Let's not mention why we need it. This might easily be changed, and the javadoc will not be updated, so it will become stale ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneInMemoryTest.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.TestDefaultProfilesNames; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorage; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.Test; + +/** + * Set of tests that are checking how in-memory scenarios are working under zone colocation. + */ +public class ItZoneInMemoryTest extends ItAbstractColocationTest { + @Test + void testMixedStorageProfilesZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfMixedStorageProfiles(node, zoneId); + + assertFalse(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + @Test + void testVolatileStorageProfileZone() throws Exception { + startCluster(1); + + Node node = getNode(0); + + String zoneName = "mixed_zone"; + int zoneId = createZoneWithStorageProfiles( + node, + zoneName, + 1, + 1, + TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME + ); + + checkZoneConsistOfVolatileOnlyStorageProfile(node, zoneId); + + assertTrue(isRaftLogStorageVolatile(node, zoneId)); + + checkWriteReadInVolatileTableStorageIsSuccessful(node, zoneName); + } + + private static void checkWriteReadInVolatileTableStorageIsSuccessful(Node node, String zoneName) throws NodeStoppingException { + String tableName = "volatile_table"; + + createTable(node, zoneName, tableName, TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + checkTableStorageIsVolatile(node, tableId); + + KeyValueView<Long, Integer> kvView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); + + Map<Long, Integer> values = Map.of( + 0L, 0, + 1L, 1, + 2L, 2 + ); + + node.transactions().runInTransaction(tx -> { + assertDoesNotThrow(() -> kvView.putAll(tx, values)); + }); + + node.transactions().runInTransaction(tx -> { + assertEquals(values.size(), assertDoesNotThrow(() -> kvView.getAll(tx, Set.of(0L, 1L, 2L))).size()); + }); + } + + private static void checkTableStorageIsVolatile(Node node, int tableId) throws NodeStoppingException { + InternalTableImpl internalTable = (InternalTableImpl) node.tableManager.table(tableId).internalTable(); + + assertInstanceOf(VolatilePageMemoryTableStorage.class, internalTable.storage()); + } + + private static List<CatalogStorageProfileDescriptor> extractZoneProfiles(Node node, int zoneId) { + CatalogZoneDescriptor zoneDescriptor = node.catalogManager.catalog(node.catalogManager.latestCatalogVersion()).zone(zoneId); + + assertNotNull(zoneDescriptor); + + return zoneDescriptor.storageProfiles().profiles(); + } + + private static boolean isRaftLogStorageVolatile(Node node, int zoneId) { + RaftServer raftServer = node.raftManager.server(); + + ConcurrentMap<RaftNodeId, RaftGroupService> nodes = getFieldValue(raftServer, JraftServerImpl.class, "nodes"); + + ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0); + + String singlePeerConsistentId = node.clusterService.topologyService().localMember().name(); + + RaftNodeId raftNodeId = new RaftNodeId(zonePartitionId, new Peer(singlePeerConsistentId)); + + RaftGroupService raftService = nodes.get(raftNodeId); + + NodeImpl raftNode = getFieldValue(raftService, RaftGroupService.class, "node"); + + LogStorage raftLogStorage = getFieldValue(raftNode, NodeImpl.class, "logStorage"); + + return raftLogStorage instanceof VolatileLogStorage; + } + + private static void checkZoneConsistOfMixedStorageProfiles(Node node, int zoneId) { + List<CatalogStorageProfileDescriptor> zoneProfiles = extractZoneProfiles(node, zoneId); + + assertEquals(2, zoneProfiles.size()); + + boolean volatileEngineIsPresented = false; + boolean persistentEngineIsPresented = false; + + for (CatalogStorageProfileDescriptor profile : zoneProfiles) { + StorageEngine engine = node.dataStorageManager().engineByStorageProfile(profile.storageProfile()); + + assertNotNull(engine); + + if (engine.isVolatile()) { + volatileEngineIsPresented = true; + } else { + persistentEngineIsPresented = true; + } + } + + assertTrue(volatileEngineIsPresented); + assertTrue(persistentEngineIsPresented); Review Comment: Please add a message including the list of profiles to the assertions to facilitate debugging a failed assertion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org