sashapolo commented on code in PR #5588: URL: https://github.com/apache/ignite-3/pull/5588#discussion_r2041633170
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -199,6 +223,141 @@ void testLocalRaftLogReapplication() throws Exception { assertThat(kvView.get(null, 42), is(42)); } + /** + * Tests the following scenario. + * + * <ol> + * <li>Create a table and fill it with data;</li> + * <li>Truncate Raft log to force snapshot installation on joining nodes;</li> + * <li>Drop the table, but do not update the low watermark;</li> + * <li>Add a new node to the cluster;</li> + * <li>Existing node will start streaming the snapshot to the joining node. At the same time, update the Low Watermark value + * to force deletion of table storages on the existing node;</li> + * <li>Validate that concurrent snapshot streaming and storages removals do not interfere with each other.</li> + * </ol> + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnOutgoingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + CompletableFuture<Void> outgoingSnapshotInstallationStartedFuture = startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + // After we detected that snapshot is going to be installed, update the low watermark to start table removal and allow the + // snapshot to proceed. + CompletableFuture<Void> runRaceFuture = outgoingSnapshotInstallationStartedFuture + .thenRunAsync(() -> { + node.lowWatermark.updateLowWatermark(node.hybridClock.now()); + + verify(tableStorage(node, tableId), never()).destroyPartition(anyInt()); + + stopBlockingMessages(node, partitionId); + }, executorService); + + assertThat(runRaceFuture, willCompleteSuccessfully()); + + MvPartitionStorage mvPartition = tableStorage(newNode, tableId).getMvPartition(0); + + assertThat(mvPartition, is(notNullValue())); + + assertTrue(waitForCondition(() -> mvPartition.estimatedSize() == data.size(), 10_000)); + } + + /** + * Similar to {@link #testTableDropInTheMiddleOfRebalanceOnOutgoingSide} but tests table storage removal during an incoming snapshot + * rather than outgoing. + */ + @Test + void testTableDropInTheMiddleOfRebalanceOnIncomingSide(@InjectExecutorService ExecutorService executorService) throws Exception { + startCluster(1); + + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + + Node node = cluster.get(0); + + KeyValueView<Integer, Integer> kvView = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + + Map<Integer, Integer> data = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView.putAll(null, data); + + var partitionId = new ZonePartitionId(zoneId, 0); + + truncateLogOnEveryNode(partitionId); + + // Block outgoing snapshot so that we have time to register mocks on the receiving side. + startBlockOutgoingSnapshot(node, partitionId); + + Node newNode = addNodeToCluster(); + + assertTrue(waitForCondition(() -> tableStorage(newNode, tableId) != null, 10_000)); + + MvTableStorage tableStorage = tableStorage(newNode, tableId); + + var firstConditionReachedFuture = new CompletableFuture<Void>(); + var secondConditionReachedFuture = new CompletableFuture<Void>(); + + doAnswer(invocationOnMock -> { + CompletableFuture<Void> result = secondConditionReachedFuture.thenCompose(v -> { + try { + return (CompletableFuture<Void>) invocationOnMock.callRealMethod(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }); + + firstConditionReachedFuture.complete(null); + + return result; + }).when(tableStorage).startRebalancePartition(anyInt()); + + // Mocks have been registered, unblock snapshot on the sending side. + stopBlockingMessages(node, partitionId); + + dropTable(node.catalogManager, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME1); + + // This actually blocks confirmation on of the snapshot having been installed from the receiving side. This is needed to make it + // easier to call verification on mocks: we check that partition was only removed after the snapshot installation has been + // completed. + startBlockIncomingSnapshot(newNode, partitionId); + + CompletableFuture<Void> runRaceFuture = firstConditionReachedFuture Review Comment: I've added some comments and renamed the variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org