sashapolo commented on code in PR #5588: URL: https://github.com/apache/ignite-3/pull/5588#discussion_r2041632750
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); + + var firstConditionReachedFuture = new CompletableFuture<Void>(); + var secondConditionReachedFuture = new CompletableFuture<Void>(); Review Comment: Tried to make it more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org