rpuch commented on code in PR #5588: URL: https://github.com/apache/ignite-3/pull/5588#discussion_r2039407701
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); Review Comment: Isn't this verification unreliable? Partition is probably destroyed asynchronously, so we might not see it even if it happens ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -213,4 +372,50 @@ private static CompletableFuture<Void> truncateLog(Node node, ReplicationGroupId return node.replicaManager.replica(groupId) .thenCompose(replica -> replica.createSnapshotOn(member, true)); } + + private static @Nullable MvTableStorage tableStorage(Node node, int tableId) { + TableImpl table = node.tableManager.startedTables().get(tableId); + + return table == null ? null : table.internalTable().storage(); + } + + private static CompletableFuture<Void> startBlockOutgoingSnapshot(Node node, ZonePartitionId partitionId) throws InterruptedException { + return startBlockingMessages(node, partitionId, InstallSnapshotRequest.class); + } + + private static void stopBlockingMessages(Node node, ZonePartitionId partitionId) { + var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name)); + + ((JraftServerImpl) node.raftManager.server()).stopBlockMessages(raftNodeId); + } + + private static CompletableFuture<Void> startBlockIncomingSnapshot(Node node, ZonePartitionId partitionId) throws InterruptedException { Review Comment: The name seems weird. It's about blocking a response to an outgoing snapshot, not blocking an incoming snapshot (which would still be represented with an `InstallSnapshotRequest`, but going in the opposite direction), right? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); Review Comment: ```suggestion MvTableStorage receiverTableStorage = tableStorage(newNode, tableId); ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); + + var firstConditionReachedFuture = new CompletableFuture<Void>(); + var secondConditionReachedFuture = new CompletableFuture<Void>(); + + doAnswer(invocationOnMock -> { + CompletableFuture<Void> result = secondConditionReachedFuture.thenCompose(v -> { + try { + return (CompletableFuture<Void>) invocationOnMock.callRealMethod(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }); + + firstConditionReachedFuture.complete(null); + + return result; + }).when(tableStorage).startRebalancePartition(anyInt()); + + // Mocks have been registered, unblock snapshot on the sending side. + stopBlockingMessages(node, partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + // This actually blocks confirmation on of the snapshot having been installed from the receiving side. This is needed to make it + // easier to call verification on mocks: we check that partition was only removed after the snapshot installation has been + // completed. + startBlockIncomingSnapshot(newNode, partitionId); + + CompletableFuture<Void> runRaceFuture = firstConditionReachedFuture Review Comment: In the mock, second completion triggers first completion, but here first completion is needed to trigger second completion. This looks like a deadlock. How does it work? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); + + var firstConditionReachedFuture = new CompletableFuture<Void>(); + var secondConditionReachedFuture = new CompletableFuture<Void>(); + + doAnswer(invocationOnMock -> { + CompletableFuture<Void> result = secondConditionReachedFuture.thenCompose(v -> { + try { + return (CompletableFuture<Void>) invocationOnMock.callRealMethod(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }); + + firstConditionReachedFuture.complete(null); + + return result; + }).when(tableStorage).startRebalancePartition(anyInt()); + + // Mocks have been registered, unblock snapshot on the sending side. + stopBlockingMessages(node, partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + // This actually blocks confirmation on of the snapshot having been installed from the receiving side. This is needed to make it Review Comment: Is everything well with that 'on of' fragment? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); + + var firstConditionReachedFuture = new CompletableFuture<Void>(); + var secondConditionReachedFuture = new CompletableFuture<Void>(); Review Comment: What are first and second? Could they be given names instead of numbers? Also, it's a bit weird that first is triggered when second completes. ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); Review Comment: Same thing here: should natural LWM update be disabled? Otherwise it might happen just before our check and the storage might end up being destroyed ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java: ########## @@ -569,6 +569,13 @@ private String createPartitionInfo() { private void writeVersion(SnapshotContext snapshotContext, ResponseEntry entry, int entryIndex) { PartitionMvStorageAccess partition = snapshotContext.partitionsByTableId.get(entry.tableId()); + if (partition == null) { + // Table might have been removed locally which is a normal situation, we log it just in case. + LOG.warn("No partition storage found locally for tableId={} while installing a snapshot", entry.tableId()); Review Comment: What if the sender keeps sending us data, can this flood our logs with such warnings? Would `IgniteThrottledLogger` help? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); Review Comment: Should we disable natural low watermark updates first? Just to make sure it never happens 'by itself' ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { Review Comment: Would sending/receiving be better than outgoing/incoming? The latter seems to better suit the perspective of some specific node, but the former doesn't require any node-centered perspective. ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java: ########## @@ -94,4 +103,56 @@ void choosesMinimalIndexFromTxStorage(@Mock PartitionMvStorageAccess partitionAc assertEquals(3L, startupSnapshotMeta.lastIncludedIndex()); assertEquals(2L, startupSnapshotMeta.lastIncludedTerm()); } + + @Test + void partitionCanBeRemovedOnlyWithoutOutgoingSnapshot(@Mock PartitionMvStorageAccess partitionAccess) throws IOException { Review Comment: This test actually tests both cases: that without any ongoing operation the storage can be removed, and with an operation it cannot, but the name of the test only says about the latter scenario. I makes sense to split it to 2 tests explicitly. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java: ########## @@ -187,9 +211,20 @@ public FailureProcessor failureProcessor() { * Starts an incoming snapshot. */ public SnapshotCopier startIncomingSnapshot(String uri) { + startSnapshotOperation(); + SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri); - var copier = new IncomingSnapshotCopier(this, snapshotUri, incomingSnapshotsExecutor, waitForMetadataCatchupMs); + var copier = new IncomingSnapshotCopier(this, snapshotUri, incomingSnapshotsExecutor, waitForMetadataCatchupMs) { + @Override + public void close() { Review Comment: It seems weird to use ad-hoc subclassing in production code. Would it make sense to pass a callback to `IncomingSnapshotCopier`? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -213,4 +372,50 @@ private static CompletableFuture<Void> truncateLog(Node node, ReplicationGroupId return node.replicaManager.replica(groupId) .thenCompose(replica -> replica.createSnapshotOn(member, true)); } + + private static @Nullable MvTableStorage tableStorage(Node node, int tableId) { + TableImpl table = node.tableManager.startedTables().get(tableId); + + return table == null ? null : table.internalTable().storage(); + } + + private static CompletableFuture<Void> startBlockOutgoingSnapshot(Node node, ZonePartitionId partitionId) throws InterruptedException { + return startBlockingMessages(node, partitionId, InstallSnapshotRequest.class); + } + + private static void stopBlockingMessages(Node node, ZonePartitionId partitionId) { + var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name)); + + ((JraftServerImpl) node.raftManager.server()).stopBlockMessages(raftNodeId); + } + + private static CompletableFuture<Void> startBlockIncomingSnapshot(Node node, ZonePartitionId partitionId) throws InterruptedException { + return startBlockingMessages(node, partitionId, InstallSnapshotResponse.class); + } + + private static CompletableFuture<Void> startBlockingMessages( + Node node, + ZonePartitionId partitionId, + Class<? extends Message> messageType + ) throws InterruptedException { + var raftServer = (JraftServerImpl) node.raftManager.server(); + + var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name)); + + // Wait for the Raft node to start. + assertTrue(waitForCondition(() -> raftServer.raftGroupService(raftNodeId) != null, 10_000)); + + var conditionReachedFuture = new CompletableFuture<Void>(); + + raftServer.blockMessages(raftNodeId, (msg, peerId) -> { + if (messageType.isInstance(msg)) { + conditionReachedFuture.complete(null); + + return true; + } + return false; + }); + + return conditionReachedFuture; + } Review Comment: First test seems to make sure that it destroys the table before the snapshot even starts to flow. How about a test case when a snapshot is started, half of table partition data is sent, and then the table gets destroyed? And same for destruction at receiver -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org