tkalkirill commented on code in PR #6033:
URL: https://github.com/apache/ignite-3/pull/6033#discussion_r2160988177


##########
modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -146,6 +158,48 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws 
Exception {
         );
     }
 
+    @Test
+    @WithSystemProperty(key = THREAD_ASSERTIONS_ENABLED, value = "false")
+    public void compactionAfterAbortedRebalance() throws InterruptedException {
+        createZone(ZONE_NAME, 1, 1);
+        createTestTable(TABLE_NAME, ZONE_NAME);
+
+        IgniteImpl igniteImpl = igniteImpl(0);
+
+        TableViewInternal table = 
unwrapTableViewInternal(igniteImpl.tables().table(TABLE_NAME));
+
+        PersistentPageMemoryTableStorage tableStorage = 
(PersistentPageMemoryTableStorage) table.internalTable().storage();
+
+        table.keyValueView().put(
+                null,
+                Tuple.create().set("id", 1),
+                Tuple.create().set("val", "value1")
+        );
+
+        CompletableFuture<Void> abortRebalance = 
tableStorage.startRebalancePartition(0)
+                        .thenCompose(v -> 
tableStorage.abortRebalancePartition(0));
+
+        assertThat(abortRebalance, willCompleteSuccessfully());
+
+        assertEquals(0, tableStorage.getMvPartition(0).lastAppliedIndex());
+
+        waitForCompaction(tableStorage);
+    }
+
+    private static void waitForCompaction(PersistentPageMemoryTableStorage 
tableStorage) throws InterruptedException {
+        PersistentPageMemoryDataRegion dataRegion = tableStorage.dataRegion();
+        CheckpointManager checkpointManager = dataRegion.checkpointManager();
+
+        CheckpointProgress checkpointProgress = 
checkpointManager.forceCheckpoint("");
+        assertThat(checkpointProgress.futureFor(CheckpointState.FINISHED), 
willCompleteSuccessfully());
+
+        checkpointManager.triggerCompaction();

Review Comment:
   Why, if according to the algorithm after completing the checkpoint we will 
launch compaction?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -626,7 +626,7 @@ public void testReCreatePartition() throws Exception {
             return null;
         });
 
-        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(2, SECONDS);

Review Comment:
   Let's change it to `assertThat(..., willCompleteSuccessfully())`?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -353,13 +356,40 @@ CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMv
     }
 
     private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
-        
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
+        FilePageStore store = 
dataRegion.filePageStoreManager().getStore(groupPartitionId);
 
-        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+        assert store != null : groupPartitionId;
 
-        return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
-                .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
-                .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+        store.markToDestroy();
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        CheckpointManager checkpointManager = dataRegion.checkpointManager();
+
+        CheckpointListener listener = new CheckpointListener() {
+            @Override
+            public void afterCheckpointEnd(CheckpointProgress progress) {
+                checkpointManager.removeCheckpointListener(this);
+
+                try {
+                    
dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+
+                    
dataRegion.partitionMetaManager().removeMeta(groupPartitionId);
+
+                    future.complete(null);
+                } catch (Exception e) {
+                    future.completeExceptionally(new 
StorageException("Couldn't destroy partition: " + groupPartitionId, e));
+                }
+            }
+        };
+
+        checkpointManager.addCheckpointListener(listener, dataRegion);
+
+        CheckpointProgress checkpoint = 
dataRegion.checkpointManager().scheduleCheckpoint(1000, "Partition 
destruction");

Review Comment:
   Why do you think a second should be enough?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -353,13 +356,40 @@ CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMv
     }
 
     private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
-        
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
+        FilePageStore store = 
dataRegion.filePageStoreManager().getStore(groupPartitionId);
 
-        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+        assert store != null : groupPartitionId;
 
-        return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
-                .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
-                .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+        store.markToDestroy();
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        CheckpointManager checkpointManager = dataRegion.checkpointManager();
+
+        CheckpointListener listener = new CheckpointListener() {
+            @Override
+            public void afterCheckpointEnd(CheckpointProgress progress) {
+                checkpointManager.removeCheckpointListener(this);
+
+                try {
+                    
dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+
+                    
dataRegion.partitionMetaManager().removeMeta(groupPartitionId);
+
+                    future.complete(null);
+                } catch (Exception e) {
+                    future.completeExceptionally(new 
StorageException("Couldn't destroy partition: " + groupPartitionId, e));

Review Comment:
   The error text is not clear, this is not deleting the partition yet.



##########
modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -146,6 +158,48 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws 
Exception {
         );
     }
 
+    @Test
+    @WithSystemProperty(key = THREAD_ASSERTIONS_ENABLED, value = "false")
+    public void compactionAfterAbortedRebalance() throws InterruptedException {
+        createZone(ZONE_NAME, 1, 1);
+        createTestTable(TABLE_NAME, ZONE_NAME);
+
+        IgniteImpl igniteImpl = igniteImpl(0);
+
+        TableViewInternal table = 
unwrapTableViewInternal(igniteImpl.tables().table(TABLE_NAME));
+
+        PersistentPageMemoryTableStorage tableStorage = 
(PersistentPageMemoryTableStorage) table.internalTable().storage();
+
+        table.keyValueView().put(
+                null,
+                Tuple.create().set("id", 1),
+                Tuple.create().set("val", "value1")
+        );
+
+        CompletableFuture<Void> abortRebalance = 
tableStorage.startRebalancePartition(0)
+                        .thenCompose(v -> 
tableStorage.abortRebalancePartition(0));
+
+        assertThat(abortRebalance, willCompleteSuccessfully());
+
+        assertEquals(0, tableStorage.getMvPartition(0).lastAppliedIndex());
+
+        waitForCompaction(tableStorage);
+    }
+
+    private static void waitForCompaction(PersistentPageMemoryTableStorage 
tableStorage) throws InterruptedException {

Review Comment:
   Is the name very generic, a catalog compact, a metastore, or a delta files?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -626,7 +626,7 @@ public void testReCreatePartition() throws Exception {
             return null;
         });
 
-        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(2, SECONDS);

Review Comment:
   The change from 1 second to 2 worries me a little.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -353,13 +356,40 @@ CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMv
     }
 
     private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
-        
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
+        FilePageStore store = 
dataRegion.filePageStoreManager().getStore(groupPartitionId);
 
-        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+        assert store != null : groupPartitionId;
 
-        return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
-                .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
-                .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+        store.markToDestroy();
+
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   U can use `var`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to