sk0x50 commented on code in PR #5276: URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1981385676
########## modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java: ########## @@ -1190,4 +1233,18 @@ private TestColumnTypeParams(ColumnType type, @Nullable Integer precision, @Null private @Nullable CatalogTableDescriptor table(int catalogVersion, String tableName) { return manager.catalog(catalogVersion).table(SCHEMA_NAME, tableName); } + + private static Matcher<CatalogTableDescriptor> descriptorWithName(String name) { Review Comment: Agree. Done. ########## modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java: ########## @@ -153,46 +160,84 @@ private void onIndexRemoved(RemoveIndexEventParameters parameters) { private void onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { inBusyLock(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + PartitionGroupId primaryReplicaId = (PartitionGroupId) parameters.groupId(); if (primaryReplicaId.partitionId() != 0) { // We are only interested in the 0 partition. return; } - int tableId = primaryReplicaId.tableId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) { - scheduleTasksOnPrimaryReplicaElectedBusy(tableId); - } + scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId); } else { - if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) { - changeIndexStatusTaskScheduler.stopTasksForTable(tableId); - } + scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId); } }); } - private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) { + private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId partitionGroupId) { + // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. + Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); + + IntListIterator tableIds = + getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId, localNodeIsPrimaryReplicaForTableIds::add); + + while (tableIds.hasNext()) { + for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableIds.nextInt())) { + switch (indexDescriptor.status()) { + case REGISTERED: + changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + + break; + + case STOPPING: + changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor); + + break; + + default: + break; + } + } + } + } + + private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId partitionGroupId) { // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { - switch (indexDescriptor.status()) { - case REGISTERED: - changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + IntListIterator tableIds = + getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId, localNodeIsPrimaryReplicaForTableIds::remove); + + while (tableIds.hasNext()) { + changeIndexStatusTaskScheduler.stopTasksForTable(tableIds.nextInt()); + } + } - break; + private IntListIterator getTableIdsForPrimaryReplicaElected( + Catalog catalog, + PartitionGroupId partitionGroupId, + Int2BooleanFunction filter Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org