sashapolo commented on code in PR #5300: URL: https://github.com/apache/ignite-3/pull/5300#discussion_r1975433506
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java: ########## @@ -28,6 +28,11 @@ public enum LocalPartitionReplicaEvent implements Event { */ BEFORE_REPLICA_STARTED, + /** + * Fired when partition replica has been just stopped and the related partition shouldn't be destroyed then e.g. on Ignite node stop. Review Comment: ```suggestion * Fired when partition replica has just been stopped and the related partition shouldn't be destroyed e.g. on Ignite node stop. ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java: ########## @@ -649,6 +649,33 @@ public void testScanCloseReplicaRequest() throws Exception { assertDoesNotThrow(tx::commit); } + @Test + public void testNodeStop() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + placementDriver.setPrimary(node.clusterService.topologyService().localMember()); + + // Prepare a zone. + String zoneName = "test_zone"; + createZone(node, zoneName, 1, 1); + + // Create a table to work with. + String tableName = "test_table"; + createTable(node, zoneName, tableName); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + InternalTable internalTable = node.tableManager.table(tableId).internalTable(); + + // Stop the node + stopNode(0); + + // Check that the storages close method was triggered + verify(internalTable.storage(), timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1)) Review Comment: Why `atLeast`? Do we close it multiple times? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java: ########## @@ -649,6 +649,33 @@ public void testScanCloseReplicaRequest() throws Exception { assertDoesNotThrow(tx::commit); } + @Test + public void testNodeStop() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + placementDriver.setPrimary(node.clusterService.topologyService().localMember()); Review Comment: I think this line is no longer needed ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -273,6 +273,15 @@ public void addTableReplicaListener(TablePartitionId partitionId, Function<RaftC replicas.put(partitionId.tableId(), replicaListener.apply(raftClient)); } + /** + * Remove table partition listener by table replication identifier from the current zone replica listener. Review Comment: ```suggestion * Removes table partition listener by table replication identifier from the current zone replica listener. ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1316,45 +1331,51 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { return Assignments.fromBytes(entry.value()); } - private CompletableFuture<Void> weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) { - return replicaMgr.weakStopReplica( - zonePartitionId, - WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, - () -> stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> { - if (!replicaWasStopped) { - return nullCompletedFuture(); - } - - zoneResourcesManager.destroyZonePartitionResources(zonePartitionId); - - return fireEvent( - LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, - new LocalPartitionReplicaEventParameters(zonePartitionId, revision) - ); - }) - ); - } - /** - * Stops all resources associated with a given partition, like replicas and partition trackers. + * Stops zone replica, executes a given action and fires a given local event after if are not null. + * + * @param zonePartitionId Zone's replication group identifier. + * @param afterReplicaStopAction A consumer that will be executed if it is not null after zone replica stop process will be finished and + * stopping result will be given to the consumer. + * @param afterReplicaStoppedEvent A local event type that if not null should be fired in the end of the method. + * @param afterReplicaStoppedEventRevision A revision parameter of the local event if the last is presented. Must not be null if the + * event isn't null too. * - * @param zonePartitionId Partition ID. - * @return Future that will be completed after all resources have been closed and the future's result answers was replica was stopped - * correctly or not. + * @return Future that will be completed after zone replica was stopped and all given non-null actions are done, the future's result + * answers was replica was stopped correctly or not. */ - private CompletableFuture<Boolean> stopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture<Boolean> stopPartitionInternal( + ZonePartitionId zonePartitionId, + @Nullable Consumer<Boolean> afterReplicaStopAction, + @Nullable LocalPartitionReplicaEvent afterReplicaStoppedEvent, Review Comment: This parameter is never null ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1316,45 +1331,51 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { return Assignments.fromBytes(entry.value()); } - private CompletableFuture<Void> weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) { - return replicaMgr.weakStopReplica( - zonePartitionId, - WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, - () -> stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> { - if (!replicaWasStopped) { - return nullCompletedFuture(); - } - - zoneResourcesManager.destroyZonePartitionResources(zonePartitionId); - - return fireEvent( - LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, - new LocalPartitionReplicaEventParameters(zonePartitionId, revision) - ); - }) - ); - } - /** - * Stops all resources associated with a given partition, like replicas and partition trackers. + * Stops zone replica, executes a given action and fires a given local event after if are not null. + * + * @param zonePartitionId Zone's replication group identifier. + * @param afterReplicaStopAction A consumer that will be executed if it is not null after zone replica stop process will be finished and + * stopping result will be given to the consumer. + * @param afterReplicaStoppedEvent A local event type that if not null should be fired in the end of the method. + * @param afterReplicaStoppedEventRevision A revision parameter of the local event if the last is presented. Must not be null if the + * event isn't null too. * - * @param zonePartitionId Partition ID. - * @return Future that will be completed after all resources have been closed and the future's result answers was replica was stopped - * correctly or not. + * @return Future that will be completed after zone replica was stopped and all given non-null actions are done, the future's result + * answers was replica was stopped correctly or not. */ - private CompletableFuture<Boolean> stopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture<Boolean> stopPartitionInternal( + ZonePartitionId zonePartitionId, + @Nullable Consumer<Boolean> afterReplicaStopAction, Review Comment: I propose to not pass null here as well ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -717,32 +724,38 @@ private CompletableFuture<Boolean> beforeZoneReplicaStarted(LocalPartitionReplic ); } - private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) { + private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) { if (!enabledColocation()) { return falseCompletedFuture(); } - return inBusyLockAsync(busyLock, () -> { - Set<TableImpl> zoneTables = zoneTables(parameters.zonePartitionId().zoneId()); + Set<TableImpl> zoneTables = zoneTables(parameters.zonePartitionId().zoneId()); Review Comment: What about taking the busy lock? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1936,6 +1950,13 @@ private CompletableFuture<Void> destroyTableLocally(int tableId) { for (int partitionId = 0; partitionId < partitions; partitionId++) { var replicationGroupId = new TablePartitionId(tableId, partitionId); + if (enabledColocation()) { + partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica( + new ZonePartitionId(internalTable.zoneId(), replicationGroupId.partitionId()), Review Comment: ```suggestion new ZonePartitionId(internalTable.zoneId(), partitionId), ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -371,6 +371,15 @@ public void addTableProcessor(TablePartitionId tablePartitionId, RaftTableProces } } + /** + * Removes a given Table Partition-level Raft processor from the set of managed processor. Review Comment: ```suggestion * Removes a given Table Partition-level Raft processor from the set of managed processors. ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -670,7 +675,9 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) { attemptsObtainLock = txCfg.attemptsObtainLock().value(); - executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryReplicaExpired); + if (!enabledColocation()) { Review Comment: Why is this needed? ########## modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java: ########## @@ -61,6 +61,13 @@ public interface InternalTable extends ManuallyCloseable { */ int tableId(); + /** + * Gets a zone id what the table belongs. Review Comment: ```suggestion * Returns the zone ID that the table belongs to. ``` ########## modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java: ########## @@ -137,6 +138,7 @@ public static void throwExceptionDependingOnStorageStateOnRebalance(StorageState public static void throwExceptionDependingOnStorageState(StorageState state, String storageInfo) { switch (state) { case CLOSED: + assert !IgniteSystemProperties.enabledColocation() : createStorageClosedErrorMessage(storageInfo); Review Comment: Why is this line needed? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -371,6 +371,15 @@ public void addTableProcessor(TablePartitionId tablePartitionId, RaftTableProces } } + /** + * Removes a given Table Partition-level Raft processor from the set of managed processor. + */ + public void removeTableProcessor(TablePartitionId tablePartitionId) { Review Comment: Same question ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -283,6 +283,12 @@ public int tableId() { return tableId; } + /** {@inheritDoc} */ Review Comment: We don't use this annotation, please remove it ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1428,6 +1453,27 @@ public void loadTableListenerToZoneReplica( resources.snapshotStorageFactory().addMvPartition(tablePartitionId.tableId(), partitionMvStorageAccess); } + /** + * Load a new table partition listener to the zone replica. + * + * @param zonePartitionId Zone partition id. + * @param tablePartitionId Table partition id. + */ + public void unloadTableResourcesFromZoneReplica( + ZonePartitionId zonePartitionId, + TablePartitionId tablePartitionId Review Comment: This should be `int tableId` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -273,6 +273,15 @@ public void addTableReplicaListener(TablePartitionId partitionId, Function<RaftC replicas.put(partitionId.tableId(), replicaListener.apply(raftClient)); } + /** + * Remove table partition listener by table replication identifier from the current zone replica listener. + * + * @param partitionId Table partition id. + */ + public void removeTableReplicaListener(TablePartitionId partitionId) { Review Comment: Shall we pass `int tableId` here instead? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1316,45 +1331,51 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { return Assignments.fromBytes(entry.value()); } - private CompletableFuture<Void> weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) { - return replicaMgr.weakStopReplica( - zonePartitionId, - WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, - () -> stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> { - if (!replicaWasStopped) { - return nullCompletedFuture(); - } - - zoneResourcesManager.destroyZonePartitionResources(zonePartitionId); - - return fireEvent( - LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, - new LocalPartitionReplicaEventParameters(zonePartitionId, revision) - ); - }) - ); - } - /** - * Stops all resources associated with a given partition, like replicas and partition trackers. + * Stops zone replica, executes a given action and fires a given local event after if are not null. + * + * @param zonePartitionId Zone's replication group identifier. + * @param afterReplicaStopAction A consumer that will be executed if it is not null after zone replica stop process will be finished and + * stopping result will be given to the consumer. + * @param afterReplicaStoppedEvent A local event type that if not null should be fired in the end of the method. + * @param afterReplicaStoppedEventRevision A revision parameter of the local event if the last is presented. Must not be null if the + * event isn't null too. * - * @param zonePartitionId Partition ID. - * @return Future that will be completed after all resources have been closed and the future's result answers was replica was stopped - * correctly or not. + * @return Future that will be completed after zone replica was stopped and all given non-null actions are done, the future's result + * answers was replica was stopped correctly or not. */ - private CompletableFuture<Boolean> stopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture<Boolean> stopPartitionInternal( + ZonePartitionId zonePartitionId, + @Nullable Consumer<Boolean> afterReplicaStopAction, + @Nullable LocalPartitionReplicaEvent afterReplicaStoppedEvent, + @Nullable Long afterReplicaStoppedEventRevision Review Comment: This parameter is never null ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2814,7 +2835,11 @@ private CompletableFuture<Void> stopPartition(TablePartitionId tablePartitionId, CompletableFuture<Boolean> stopReplicaFuture; try { - stopReplicaFuture = replicaMgr.stopReplica(tablePartitionId); + // In case of colocation there shouldn't be any table replica and thus it shouldn't be stopped. Moreover the excessive replica Review Comment: This is weird for two reasons: 1. Why do we ignore the `isRemovedFuture` result inside `replicaMgr.stopReplica` and always stop the raft node? 2. Why do we have a raft node at all in the colocation case? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -717,32 +724,38 @@ private CompletableFuture<Boolean> beforeZoneReplicaStarted(LocalPartitionReplic ); } - private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) { + private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) { if (!enabledColocation()) { return falseCompletedFuture(); } - return inBusyLockAsync(busyLock, () -> { - Set<TableImpl> zoneTables = zoneTables(parameters.zonePartitionId().zoneId()); + Set<TableImpl> zoneTables = zoneTables(parameters.zonePartitionId().zoneId()); - CompletableFuture<?>[] futures = zoneTables.stream() - .map(table -> { - closePartitionTrackers(table.internalTable(), parameters.zonePartitionId().partitionId()); + CompletableFuture<?>[] futures = zoneTables.stream() + .map(table -> supplyAsync( Review Comment: You have a bug here, must be: ``` CompletableFuture<?>[] futures = zoneTables.stream() .map(table -> supplyAsync(() -> tableStopFuture(table), ioExecutor).thenCompose(identity())) .toArray(CompletableFuture[]::new); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org