tkalkirill commented on code in PR #6033: URL: https://github.com/apache/ignite-3/pull/6033#discussion_r2160988177
########## modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java: ########## @@ -146,6 +158,48 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception { ); } + @Test + @WithSystemProperty(key = THREAD_ASSERTIONS_ENABLED, value = "false") + public void compactionAfterAbortedRebalance() throws InterruptedException { + createZone(ZONE_NAME, 1, 1); + createTestTable(TABLE_NAME, ZONE_NAME); + + IgniteImpl igniteImpl = igniteImpl(0); + + TableViewInternal table = unwrapTableViewInternal(igniteImpl.tables().table(TABLE_NAME)); + + PersistentPageMemoryTableStorage tableStorage = (PersistentPageMemoryTableStorage) table.internalTable().storage(); + + table.keyValueView().put( + null, + Tuple.create().set("id", 1), + Tuple.create().set("val", "value1") + ); + + CompletableFuture<Void> abortRebalance = tableStorage.startRebalancePartition(0) + .thenCompose(v -> tableStorage.abortRebalancePartition(0)); + + assertThat(abortRebalance, willCompleteSuccessfully()); + + assertEquals(0, tableStorage.getMvPartition(0).lastAppliedIndex()); + + waitForCompaction(tableStorage); + } + + private static void waitForCompaction(PersistentPageMemoryTableStorage tableStorage) throws InterruptedException { + PersistentPageMemoryDataRegion dataRegion = tableStorage.dataRegion(); + CheckpointManager checkpointManager = dataRegion.checkpointManager(); + + CheckpointProgress checkpointProgress = checkpointManager.forceCheckpoint(""); + assertThat(checkpointProgress.futureFor(CheckpointState.FINISHED), willCompleteSuccessfully()); + + checkpointManager.triggerCompaction(); Review Comment: Why, if according to the algorithm after completing the checkpoint we will launch compaction? ########## modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java: ########## @@ -626,7 +626,7 @@ public void testReCreatePartition() throws Exception { return null; }); - tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS); + tableStorage.destroyPartition(PARTITION_ID).get(2, SECONDS); Review Comment: Let's change it to `assertThat(..., willCompleteSuccessfully())`? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -353,13 +356,40 @@ CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMv } private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { - dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); + FilePageStore store = dataRegion.filePageStoreManager().getStore(groupPartitionId); - dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + assert store != null : groupPartitionId; - return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) - .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) - .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); + store.markToDestroy(); + + CompletableFuture<Void> future = new CompletableFuture<>(); + + CheckpointManager checkpointManager = dataRegion.checkpointManager(); + + CheckpointListener listener = new CheckpointListener() { + @Override + public void afterCheckpointEnd(CheckpointProgress progress) { + checkpointManager.removeCheckpointListener(this); + + try { + dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + + dataRegion.partitionMetaManager().removeMeta(groupPartitionId); + + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(new StorageException("Couldn't destroy partition: " + groupPartitionId, e)); + } + } + }; + + checkpointManager.addCheckpointListener(listener, dataRegion); + + CheckpointProgress checkpoint = dataRegion.checkpointManager().scheduleCheckpoint(1000, "Partition destruction"); Review Comment: Why do you think a second should be enough? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -353,13 +356,40 @@ CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMv } private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { - dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); + FilePageStore store = dataRegion.filePageStoreManager().getStore(groupPartitionId); - dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + assert store != null : groupPartitionId; - return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) - .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) - .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); + store.markToDestroy(); + + CompletableFuture<Void> future = new CompletableFuture<>(); + + CheckpointManager checkpointManager = dataRegion.checkpointManager(); + + CheckpointListener listener = new CheckpointListener() { + @Override + public void afterCheckpointEnd(CheckpointProgress progress) { + checkpointManager.removeCheckpointListener(this); + + try { + dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + + dataRegion.partitionMetaManager().removeMeta(groupPartitionId); + + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(new StorageException("Couldn't destroy partition: " + groupPartitionId, e)); Review Comment: The error text is not clear, this is not deleting the partition yet. ########## modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java: ########## @@ -146,6 +158,48 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception { ); } + @Test + @WithSystemProperty(key = THREAD_ASSERTIONS_ENABLED, value = "false") + public void compactionAfterAbortedRebalance() throws InterruptedException { + createZone(ZONE_NAME, 1, 1); + createTestTable(TABLE_NAME, ZONE_NAME); + + IgniteImpl igniteImpl = igniteImpl(0); + + TableViewInternal table = unwrapTableViewInternal(igniteImpl.tables().table(TABLE_NAME)); + + PersistentPageMemoryTableStorage tableStorage = (PersistentPageMemoryTableStorage) table.internalTable().storage(); + + table.keyValueView().put( + null, + Tuple.create().set("id", 1), + Tuple.create().set("val", "value1") + ); + + CompletableFuture<Void> abortRebalance = tableStorage.startRebalancePartition(0) + .thenCompose(v -> tableStorage.abortRebalancePartition(0)); + + assertThat(abortRebalance, willCompleteSuccessfully()); + + assertEquals(0, tableStorage.getMvPartition(0).lastAppliedIndex()); + + waitForCompaction(tableStorage); + } + + private static void waitForCompaction(PersistentPageMemoryTableStorage tableStorage) throws InterruptedException { Review Comment: Is the name very generic, a catalog compact, a metastore, or a delta files? ########## modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java: ########## @@ -626,7 +626,7 @@ public void testReCreatePartition() throws Exception { return null; }); - tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS); + tableStorage.destroyPartition(PARTITION_ID).get(2, SECONDS); Review Comment: The change from 1 second to 2 worries me a little. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -353,13 +356,40 @@ CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMv } private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { - dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); + FilePageStore store = dataRegion.filePageStoreManager().getStore(groupPartitionId); - dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); + assert store != null : groupPartitionId; - return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) - .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) - .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); + store.markToDestroy(); + + CompletableFuture<Void> future = new CompletableFuture<>(); Review Comment: U can use `var` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org