rpuch commented on code in PR #5588:
URL: https://github.com/apache/ignite-3/pull/5588#discussion_r2039407701


##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());

Review Comment:
   Isn't this verification unreliable? Partition is probably destroyed 
asynchronously, so we might not see it even if it happens



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -213,4 +372,50 @@ private static CompletableFuture<Void> truncateLog(Node 
node, ReplicationGroupId
         return node.replicaManager.replica(groupId)
                 .thenCompose(replica -> replica.createSnapshotOn(member, 
true));
     }
+
+    private static @Nullable MvTableStorage tableStorage(Node node, int 
tableId) {
+        TableImpl table = node.tableManager.startedTables().get(tableId);
+
+        return table == null ? null : table.internalTable().storage();
+    }
+
+    private static CompletableFuture<Void> startBlockOutgoingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {
+        return startBlockingMessages(node, partitionId, 
InstallSnapshotRequest.class);
+    }
+
+    private static void stopBlockingMessages(Node node, ZonePartitionId 
partitionId) {
+        var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name));
+
+        ((JraftServerImpl) 
node.raftManager.server()).stopBlockMessages(raftNodeId);
+    }
+
+    private static CompletableFuture<Void> startBlockIncomingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {

Review Comment:
   The name seems weird. It's about blocking a response to an outgoing 
snapshot, not blocking an incoming snapshot (which would still be represented 
with an `InstallSnapshotRequest`, but going in the opposite direction), right?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());
+
+                    stopBlockingMessages(node, partitionId);
+                }, executorService);
+
+        assertThat(runRaceFuture, willCompleteSuccessfully());
+
+        MvPartitionStorage mvPartition = tableStorage(newNode, 
tableId).getMvPartition(0);
+
+        assertThat(mvPartition, is(notNullValue()));
+
+        assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == 
data.size(), 10_000));
+    }
+
+    /**
+     * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} 
but tests table storage removal during an incoming snapshot
+     * rather than outgoing.
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        // Block outgoing snapshot so that we have time to register mocks on 
the receiving side.
+        startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != 
null, 10_000));
+
+        MvTableStorage tableStorage = tableStorage(newNode, tableId);

Review Comment:
   ```suggestion
           MvTableStorage receiverTableStorage = tableStorage(newNode, tableId);
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());
+
+                    stopBlockingMessages(node, partitionId);
+                }, executorService);
+
+        assertThat(runRaceFuture, willCompleteSuccessfully());
+
+        MvPartitionStorage mvPartition = tableStorage(newNode, 
tableId).getMvPartition(0);
+
+        assertThat(mvPartition, is(notNullValue()));
+
+        assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == 
data.size(), 10_000));
+    }
+
+    /**
+     * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} 
but tests table storage removal during an incoming snapshot
+     * rather than outgoing.
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        // Block outgoing snapshot so that we have time to register mocks on 
the receiving side.
+        startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != 
null, 10_000));
+
+        MvTableStorage tableStorage = tableStorage(newNode, tableId);
+
+        var firstConditionReachedFuture = new CompletableFuture<Void>();
+        var secondConditionReachedFuture = new CompletableFuture<Void>();
+
+        doAnswer(invocationOnMock -> {
+            CompletableFuture<Void> result = 
secondConditionReachedFuture.thenCompose(v -> {
+                try {
+                    return (CompletableFuture<Void>) 
invocationOnMock.callRealMethod();
+                } catch (Throwable e) {
+                    throw new CompletionException(e);
+                }
+            });
+
+            firstConditionReachedFuture.complete(null);
+
+            return result;
+        }).when(tableStorage).startRebalancePartition(anyInt());
+
+        // Mocks have been registered, unblock snapshot on the sending side.
+        stopBlockingMessages(node, partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        // This actually blocks confirmation on of the snapshot having been 
installed from the receiving side. This is needed to make it
+        // easier to call verification on mocks: we check that partition was 
only removed after the snapshot installation has been
+        // completed.
+        startBlockIncomingSnapshot(newNode, partitionId);
+
+        CompletableFuture<Void> runRaceFuture = firstConditionReachedFuture

Review Comment:
   In the mock, second completion triggers first completion, but here first 
completion is needed to trigger second completion. This looks like a deadlock. 
How does it work?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());
+
+                    stopBlockingMessages(node, partitionId);
+                }, executorService);
+
+        assertThat(runRaceFuture, willCompleteSuccessfully());
+
+        MvPartitionStorage mvPartition = tableStorage(newNode, 
tableId).getMvPartition(0);
+
+        assertThat(mvPartition, is(notNullValue()));
+
+        assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == 
data.size(), 10_000));
+    }
+
+    /**
+     * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} 
but tests table storage removal during an incoming snapshot
+     * rather than outgoing.
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        // Block outgoing snapshot so that we have time to register mocks on 
the receiving side.
+        startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != 
null, 10_000));
+
+        MvTableStorage tableStorage = tableStorage(newNode, tableId);
+
+        var firstConditionReachedFuture = new CompletableFuture<Void>();
+        var secondConditionReachedFuture = new CompletableFuture<Void>();
+
+        doAnswer(invocationOnMock -> {
+            CompletableFuture<Void> result = 
secondConditionReachedFuture.thenCompose(v -> {
+                try {
+                    return (CompletableFuture<Void>) 
invocationOnMock.callRealMethod();
+                } catch (Throwable e) {
+                    throw new CompletionException(e);
+                }
+            });
+
+            firstConditionReachedFuture.complete(null);
+
+            return result;
+        }).when(tableStorage).startRebalancePartition(anyInt());
+
+        // Mocks have been registered, unblock snapshot on the sending side.
+        stopBlockingMessages(node, partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        // This actually blocks confirmation on of the snapshot having been 
installed from the receiving side. This is needed to make it

Review Comment:
   Is everything well with that 'on of' fragment?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());
+
+                    stopBlockingMessages(node, partitionId);
+                }, executorService);
+
+        assertThat(runRaceFuture, willCompleteSuccessfully());
+
+        MvPartitionStorage mvPartition = tableStorage(newNode, 
tableId).getMvPartition(0);
+
+        assertThat(mvPartition, is(notNullValue()));
+
+        assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == 
data.size(), 10_000));
+    }
+
+    /**
+     * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} 
but tests table storage removal during an incoming snapshot
+     * rather than outgoing.
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        // Block outgoing snapshot so that we have time to register mocks on 
the receiving side.
+        startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != 
null, 10_000));
+
+        MvTableStorage tableStorage = tableStorage(newNode, tableId);
+
+        var firstConditionReachedFuture = new CompletableFuture<Void>();
+        var secondConditionReachedFuture = new CompletableFuture<Void>();

Review Comment:
   What are first and second? Could they be given names instead of numbers? 
Also, it's a bit weird that first is triggered when second completes.



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);
+
+        CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = 
startBlockOutgoingSnapshot(node, partitionId);
+
+        Node newNode = addNodeToCluster();
+
+        // After we detected that snapshot is going to be installed, update 
the low watermark to start table removal and allow the
+        // snapshot to proceed.
+        CompletableFuture<Void> runRaceFuture = 
outgoingSnapshotInstallationStartedFuture
+                .thenRunAsync(() -> {
+                    
node.lowWatermark.updateLowWatermark(node.hybridClock.now());
+
+                    verify(tableStorage(node, tableId), 
never()).destroyPartition(anyInt());
+
+                    stopBlockingMessages(node, partitionId);
+                }, executorService);
+
+        assertThat(runRaceFuture, willCompleteSuccessfully());
+
+        MvPartitionStorage mvPartition = tableStorage(newNode, 
tableId).getMvPartition(0);

Review Comment:
   Same thing here: should natural LWM update be disabled? Otherwise it might 
happen just before our check and the storage might end up being destroyed



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -569,6 +569,13 @@ private String createPartitionInfo() {
     private void writeVersion(SnapshotContext snapshotContext, ResponseEntry 
entry, int entryIndex) {
         PartitionMvStorageAccess partition = 
snapshotContext.partitionsByTableId.get(entry.tableId());
 
+        if (partition == null) {
+            // Table might have been removed locally which is a normal 
situation, we log it just in case.
+            LOG.warn("No partition storage found locally for tableId={} while 
installing a snapshot", entry.tableId());

Review Comment:
   What if the sender keeps sending us data, can this flood our logs with such 
warnings? Would `IgniteThrottledLogger` help?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {
+        startCluster(1);
+
+        int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1);
+
+        int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        Node node = cluster.get(0);
+
+        KeyValueView<Integer, Integer> kvView = 
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, 
Integer.class);
+
+        Map<Integer, Integer> data = IntStream.range(0, 
10).boxed().collect(toMap(Function.identity(), Function.identity()));
+
+        kvView.putAll(null, data);
+
+        var partitionId = new ZonePartitionId(zoneId, 0);
+
+        truncateLogOnEveryNode(partitionId);
+
+        dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1);

Review Comment:
   Should we disable natural low watermark updates first? Just to make sure it 
never happens 'by itself'



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception {
         assertThat(kvView.get(null, 42), is(42));
     }
 
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Create a table and fill it with data;</li>
+     *     <li>Truncate Raft log to force snapshot installation on joining 
nodes;</li>
+     *     <li>Drop the table, but do not update the low watermark;</li>
+     *     <li>Add a new node to the cluster;</li>
+     *     <li>Existing node will start streaming the snapshot to the joining 
node. At the same time, update the Low Watermark value
+     *     to force deletion of table storages on the existing node;</li>
+     *     <li>Validate that concurrent snapshot streaming and storages 
removals do not interfere with each other.</li>
+     * </ol>
+     */
+    @Test
+    void 
testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService 
ExecutorService executorService) throws Exception {

Review Comment:
   Would sending/receiving be better than outgoing/incoming? The latter seems 
to better suit the perspective of some specific node, but the former doesn't 
require any node-centered perspective.



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java:
##########
@@ -94,4 +103,56 @@ void choosesMinimalIndexFromTxStorage(@Mock 
PartitionMvStorageAccess partitionAc
         assertEquals(3L, startupSnapshotMeta.lastIncludedIndex());
         assertEquals(2L, startupSnapshotMeta.lastIncludedTerm());
     }
+
+    @Test
+    void partitionCanBeRemovedOnlyWithoutOutgoingSnapshot(@Mock 
PartitionMvStorageAccess partitionAccess) throws IOException {

Review Comment:
   This test actually tests both cases: that without any ongoing operation the 
storage can be removed, and with an operation it cannot, but the name of the 
test only says about the latter scenario. I makes sense to split it to 2 tests 
explicitly.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -187,9 +211,20 @@ public FailureProcessor failureProcessor() {
      * Starts an incoming snapshot.
      */
     public SnapshotCopier startIncomingSnapshot(String uri) {
+        startSnapshotOperation();
+
         SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri);
 
-        var copier = new IncomingSnapshotCopier(this, snapshotUri, 
incomingSnapshotsExecutor, waitForMetadataCatchupMs);
+        var copier = new IncomingSnapshotCopier(this, snapshotUri, 
incomingSnapshotsExecutor, waitForMetadataCatchupMs) {
+            @Override
+            public void close() {

Review Comment:
   It seems weird to use ad-hoc subclassing in production code. Would it make 
sense to pass a callback to `IncomingSnapshotCopier`?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -213,4 +372,50 @@ private static CompletableFuture<Void> truncateLog(Node 
node, ReplicationGroupId
         return node.replicaManager.replica(groupId)
                 .thenCompose(replica -> replica.createSnapshotOn(member, 
true));
     }
+
+    private static @Nullable MvTableStorage tableStorage(Node node, int 
tableId) {
+        TableImpl table = node.tableManager.startedTables().get(tableId);
+
+        return table == null ? null : table.internalTable().storage();
+    }
+
+    private static CompletableFuture<Void> startBlockOutgoingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {
+        return startBlockingMessages(node, partitionId, 
InstallSnapshotRequest.class);
+    }
+
+    private static void stopBlockingMessages(Node node, ZonePartitionId 
partitionId) {
+        var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name));
+
+        ((JraftServerImpl) 
node.raftManager.server()).stopBlockMessages(raftNodeId);
+    }
+
+    private static CompletableFuture<Void> startBlockIncomingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {
+        return startBlockingMessages(node, partitionId, 
InstallSnapshotResponse.class);
+    }
+
+    private static CompletableFuture<Void> startBlockingMessages(
+            Node node,
+            ZonePartitionId partitionId,
+            Class<? extends Message> messageType
+    ) throws InterruptedException {
+        var raftServer = (JraftServerImpl) node.raftManager.server();
+
+        var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name));
+
+        // Wait for the Raft node to start.
+        assertTrue(waitForCondition(() -> 
raftServer.raftGroupService(raftNodeId) != null, 10_000));
+
+        var conditionReachedFuture = new CompletableFuture<Void>();
+
+        raftServer.blockMessages(raftNodeId, (msg, peerId) -> {
+            if (messageType.isInstance(msg)) {
+                conditionReachedFuture.complete(null);
+
+                return true;
+            }
+            return false;
+        });
+
+        return conditionReachedFuture;
+    }

Review Comment:
   First test seems to make sure that it destroys the table before the snapshot 
even starts to flow. How about a test case when a snapshot is started, half of 
table partition data is sent, and then the table gets destroyed? And same for 
destruction at receiver



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to