sashapolo commented on code in PR #5300:
URL: https://github.com/apache/ignite-3/pull/5300#discussion_r1975433506


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java:
##########
@@ -28,6 +28,11 @@ public enum LocalPartitionReplicaEvent implements Event {
      */
     BEFORE_REPLICA_STARTED,
 
+    /**
+     * Fired when partition replica has been just stopped and the related 
partition shouldn't be destroyed then e.g. on Ignite node stop.

Review Comment:
   ```suggestion
        * Fired when partition replica has just been stopped and the related 
partition shouldn't be destroyed e.g. on Ignite node stop.
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java:
##########
@@ -649,6 +649,33 @@ public void testScanCloseReplicaRequest() throws Exception 
{
         assertDoesNotThrow(tx::commit);
     }
 
+    @Test
+    public void testNodeStop() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+        // Prepare a zone.
+        String zoneName = "test_zone";
+        createZone(node, zoneName, 1, 1);
+
+        // Create a table to work with.
+        String tableName = "test_table";
+        createTable(node, zoneName, tableName);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+        InternalTable internalTable = 
node.tableManager.table(tableId).internalTable();
+
+        // Stop the node
+        stopNode(0);
+
+        // Check that the storages close method was triggered
+        verify(internalTable.storage(), 
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))

Review Comment:
   Why `atLeast`? Do we close it multiple times?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java:
##########
@@ -649,6 +649,33 @@ public void testScanCloseReplicaRequest() throws Exception 
{
         assertDoesNotThrow(tx::commit);
     }
 
+    @Test
+    public void testNodeStop() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());

Review Comment:
   I think this line is no longer needed



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -273,6 +273,15 @@ public void addTableReplicaListener(TablePartitionId 
partitionId, Function<RaftC
         replicas.put(partitionId.tableId(), replicaListener.apply(raftClient));
     }
 
+    /**
+     * Remove table partition listener by table replication identifier from 
the current zone replica listener.

Review Comment:
   ```suggestion
        * Removes table partition listener by table replication identifier from 
the current zone replica listener.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1316,45 +1331,51 @@ public CompletableFuture<Void> 
stopAsync(ComponentContext componentContext) {
         return Assignments.fromBytes(entry.value());
     }
 
-    private CompletableFuture<Void> 
weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
-        return replicaMgr.weakStopReplica(
-                zonePartitionId,
-                WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
-                () -> 
stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> {
-                    if (!replicaWasStopped) {
-                        return nullCompletedFuture();
-                    }
-
-                    
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
-
-                    return fireEvent(
-                            LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
-                            new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision)
-                    );
-                })
-        );
-    }
-
     /**
-     * Stops all resources associated with a given partition, like replicas 
and partition trackers.
+     * Stops zone replica, executes a given action and fires a given local 
event after if are not null.
+     *
+     * @param zonePartitionId Zone's replication group identifier.
+     * @param afterReplicaStopAction A consumer that will be executed if it is 
not null after zone replica stop process will be finished and
+     *      stopping result will be given to the consumer.
+     * @param afterReplicaStoppedEvent A local event type that if not null 
should be fired in the end of the method.
+     * @param afterReplicaStoppedEventRevision A revision parameter of the 
local event if the last is presented. Must not be null if the
+     *      event isn't null too.
      *
-     * @param zonePartitionId Partition ID.
-     * @return Future that will be completed after all resources have been 
closed and the future's result answers was replica was stopped
-     *      correctly or not.
+     * @return Future that will be completed after zone replica was stopped 
and all given non-null actions are done, the future's result
+     *      answers was replica was stopped correctly or not.
      */
-    private CompletableFuture<Boolean> stopPartition(ZonePartitionId 
zonePartitionId) {
+    private CompletableFuture<Boolean> stopPartitionInternal(
+            ZonePartitionId zonePartitionId,
+            @Nullable Consumer<Boolean> afterReplicaStopAction,
+            @Nullable LocalPartitionReplicaEvent afterReplicaStoppedEvent,

Review Comment:
   This parameter is never null



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1316,45 +1331,51 @@ public CompletableFuture<Void> 
stopAsync(ComponentContext componentContext) {
         return Assignments.fromBytes(entry.value());
     }
 
-    private CompletableFuture<Void> 
weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
-        return replicaMgr.weakStopReplica(
-                zonePartitionId,
-                WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
-                () -> 
stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> {
-                    if (!replicaWasStopped) {
-                        return nullCompletedFuture();
-                    }
-
-                    
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
-
-                    return fireEvent(
-                            LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
-                            new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision)
-                    );
-                })
-        );
-    }
-
     /**
-     * Stops all resources associated with a given partition, like replicas 
and partition trackers.
+     * Stops zone replica, executes a given action and fires a given local 
event after if are not null.
+     *
+     * @param zonePartitionId Zone's replication group identifier.
+     * @param afterReplicaStopAction A consumer that will be executed if it is 
not null after zone replica stop process will be finished and
+     *      stopping result will be given to the consumer.
+     * @param afterReplicaStoppedEvent A local event type that if not null 
should be fired in the end of the method.
+     * @param afterReplicaStoppedEventRevision A revision parameter of the 
local event if the last is presented. Must not be null if the
+     *      event isn't null too.
      *
-     * @param zonePartitionId Partition ID.
-     * @return Future that will be completed after all resources have been 
closed and the future's result answers was replica was stopped
-     *      correctly or not.
+     * @return Future that will be completed after zone replica was stopped 
and all given non-null actions are done, the future's result
+     *      answers was replica was stopped correctly or not.
      */
-    private CompletableFuture<Boolean> stopPartition(ZonePartitionId 
zonePartitionId) {
+    private CompletableFuture<Boolean> stopPartitionInternal(
+            ZonePartitionId zonePartitionId,
+            @Nullable Consumer<Boolean> afterReplicaStopAction,

Review Comment:
   I propose to not pass null here as well



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -717,32 +724,38 @@ private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalPartitionReplic
         );
     }
 
-    private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
+    private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
         if (!enabledColocation()) {
             return falseCompletedFuture();
         }
 
-        return inBusyLockAsync(busyLock, () -> {
-            Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());
+        Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());

Review Comment:
   What about taking the busy lock?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1936,6 +1950,13 @@ private CompletableFuture<Void> destroyTableLocally(int 
tableId) {
         for (int partitionId = 0; partitionId < partitions; partitionId++) {
             var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
 
+            if (enabledColocation()) {
+                
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+                        new ZonePartitionId(internalTable.zoneId(), 
replicationGroupId.partitionId()),

Review Comment:
   ```suggestion
                           new ZonePartitionId(internalTable.zoneId(), 
partitionId),
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -371,6 +371,15 @@ public void addTableProcessor(TablePartitionId 
tablePartitionId, RaftTableProces
         }
     }
 
+    /**
+     * Removes a given Table Partition-level Raft processor from the set of 
managed processor.

Review Comment:
   ```suggestion
        * Removes a given Table Partition-level Raft processor from the set of 
managed processors.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -670,7 +675,9 @@ public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
 
             attemptsObtainLock = txCfg.attemptsObtainLock().value();
 
-            
executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
 this::onPrimaryReplicaExpired);
+            if (!enabledColocation()) {

Review Comment:
   Why is this needed?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -61,6 +61,13 @@ public interface InternalTable extends ManuallyCloseable {
      */
     int tableId();
 
+    /**
+     * Gets a zone id what the table belongs.

Review Comment:
   ```suggestion
        * Returns the zone ID that the table belongs to.
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java:
##########
@@ -137,6 +138,7 @@ public static void 
throwExceptionDependingOnStorageStateOnRebalance(StorageState
     public static void throwExceptionDependingOnStorageState(StorageState 
state, String storageInfo) {
         switch (state) {
             case CLOSED:
+                assert !IgniteSystemProperties.enabledColocation() : 
createStorageClosedErrorMessage(storageInfo);

Review Comment:
   Why is this line needed?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -371,6 +371,15 @@ public void addTableProcessor(TablePartitionId 
tablePartitionId, RaftTableProces
         }
     }
 
+    /**
+     * Removes a given Table Partition-level Raft processor from the set of 
managed processor.
+     */
+    public void removeTableProcessor(TablePartitionId tablePartitionId) {

Review Comment:
   Same question



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -283,6 +283,12 @@ public int tableId() {
         return tableId;
     }
 
+    /** {@inheritDoc} */

Review Comment:
   We don't use this annotation, please remove it



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1428,6 +1453,27 @@ public void loadTableListenerToZoneReplica(
         
resources.snapshotStorageFactory().addMvPartition(tablePartitionId.tableId(), 
partitionMvStorageAccess);
     }
 
+    /**
+     * Load a new table partition listener to the zone replica.
+     *
+     * @param zonePartitionId Zone partition id.
+     * @param tablePartitionId Table partition id.
+     */
+    public void unloadTableResourcesFromZoneReplica(
+            ZonePartitionId zonePartitionId,
+            TablePartitionId tablePartitionId

Review Comment:
   This should be `int tableId`



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -273,6 +273,15 @@ public void addTableReplicaListener(TablePartitionId 
partitionId, Function<RaftC
         replicas.put(partitionId.tableId(), replicaListener.apply(raftClient));
     }
 
+    /**
+     * Remove table partition listener by table replication identifier from 
the current zone replica listener.
+     *
+     * @param partitionId Table partition id.
+     */
+    public void removeTableReplicaListener(TablePartitionId partitionId) {

Review Comment:
   Shall we pass `int tableId` here instead?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1316,45 +1331,51 @@ public CompletableFuture<Void> 
stopAsync(ComponentContext componentContext) {
         return Assignments.fromBytes(entry.value());
     }
 
-    private CompletableFuture<Void> 
weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
-        return replicaMgr.weakStopReplica(
-                zonePartitionId,
-                WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
-                () -> 
stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> {
-                    if (!replicaWasStopped) {
-                        return nullCompletedFuture();
-                    }
-
-                    
zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
-
-                    return fireEvent(
-                            LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
-                            new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision)
-                    );
-                })
-        );
-    }
-
     /**
-     * Stops all resources associated with a given partition, like replicas 
and partition trackers.
+     * Stops zone replica, executes a given action and fires a given local 
event after if are not null.
+     *
+     * @param zonePartitionId Zone's replication group identifier.
+     * @param afterReplicaStopAction A consumer that will be executed if it is 
not null after zone replica stop process will be finished and
+     *      stopping result will be given to the consumer.
+     * @param afterReplicaStoppedEvent A local event type that if not null 
should be fired in the end of the method.
+     * @param afterReplicaStoppedEventRevision A revision parameter of the 
local event if the last is presented. Must not be null if the
+     *      event isn't null too.
      *
-     * @param zonePartitionId Partition ID.
-     * @return Future that will be completed after all resources have been 
closed and the future's result answers was replica was stopped
-     *      correctly or not.
+     * @return Future that will be completed after zone replica was stopped 
and all given non-null actions are done, the future's result
+     *      answers was replica was stopped correctly or not.
      */
-    private CompletableFuture<Boolean> stopPartition(ZonePartitionId 
zonePartitionId) {
+    private CompletableFuture<Boolean> stopPartitionInternal(
+            ZonePartitionId zonePartitionId,
+            @Nullable Consumer<Boolean> afterReplicaStopAction,
+            @Nullable LocalPartitionReplicaEvent afterReplicaStoppedEvent,
+            @Nullable Long afterReplicaStoppedEventRevision

Review Comment:
   This parameter is never null



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2814,7 +2835,11 @@ private CompletableFuture<Void> 
stopPartition(TablePartitionId tablePartitionId,
         CompletableFuture<Boolean> stopReplicaFuture;
 
         try {
-            stopReplicaFuture = replicaMgr.stopReplica(tablePartitionId);
+            // In case of colocation there shouldn't be any table replica and 
thus it shouldn't be stopped. Moreover the excessive replica

Review Comment:
   This is weird for two reasons:
   1. Why do we ignore the `isRemovedFuture` result inside 
`replicaMgr.stopReplica` and always stop the raft node?
   2. Why do we have a raft node at all in the colocation case?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -717,32 +724,38 @@ private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalPartitionReplic
         );
     }
 
-    private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
+    private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
         if (!enabledColocation()) {
             return falseCompletedFuture();
         }
 
-        return inBusyLockAsync(busyLock, () -> {
-            Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());
+        Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());
 
-            CompletableFuture<?>[] futures = zoneTables.stream()
-                    .map(table -> {
-                        closePartitionTrackers(table.internalTable(), 
parameters.zonePartitionId().partitionId());
+        CompletableFuture<?>[] futures = zoneTables.stream()
+                .map(table -> supplyAsync(

Review Comment:
   You have a bug here, must be:
   
   ```
   CompletableFuture<?>[] futures = zoneTables.stream()
           .map(table -> supplyAsync(() -> tableStopFuture(table), 
ioExecutor).thenCompose(identity()))
           .toArray(CompletableFuture[]::new);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to