sk0x50 commented on code in PR #5276: URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1981170555
########## modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java: ########## @@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters parameters) { private void onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { inBusyLock(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + PartitionGroupId primaryReplicaId = (PartitionGroupId) parameters.groupId(); if (primaryReplicaId.partitionId() != 0) { // We are only interested in the 0 partition. return; } - int tableId = primaryReplicaId.tableId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) { - scheduleTasksOnPrimaryReplicaElectedBusy(tableId); - } + scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId); } else { - if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) { - changeIndexStatusTaskScheduler.stopTasksForTable(tableId); - } + scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId); } }); } - private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) { + private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId partitionGroupId) { // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { - switch (indexDescriptor.status()) { - case REGISTERED: - changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + var tableIds = new ArrayList<Integer>(); Review Comment: Your suggestion absolutely makes sense. Introduced a new method to get a list of "affected" table ids. ########## modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java: ########## @@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters parameters) { private void onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { inBusyLock(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + PartitionGroupId primaryReplicaId = (PartitionGroupId) parameters.groupId(); if (primaryReplicaId.partitionId() != 0) { // We are only interested in the 0 partition. return; } - int tableId = primaryReplicaId.tableId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) { - scheduleTasksOnPrimaryReplicaElectedBusy(tableId); - } + scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId); } else { - if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) { - changeIndexStatusTaskScheduler.stopTasksForTable(tableId); - } + scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId); } }); } - private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) { + private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId partitionGroupId) { // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { - switch (indexDescriptor.status()) { - case REGISTERED: - changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + var tableIds = new ArrayList<Integer>(); + + if (enabledColocation()) { + ZonePartitionId zonePartitionId = (ZonePartitionId) partitionGroupId; + + for (CatalogTableDescriptor table : catalog.tables(zonePartitionId.zoneId())) { + if (localNodeIsPrimaryReplicaForTableIds.add(table.id())) { + tableIds.add(table.id()); + } + } + } else { + TablePartitionId tablePartitionId = (TablePartitionId) partitionGroupId; + + if (localNodeIsPrimaryReplicaForTableIds.add(tablePartitionId.tableId())) { + tableIds.add(tablePartitionId.tableId()); + } + } - break; + for (Integer tableId : tableIds) { + for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { + switch (indexDescriptor.status()) { + case REGISTERED: + changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); - case STOPPING: - changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor); + break; - break; + case STOPPING: + changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor); - default: - break; + break; + + default: + break; + } } } } + + private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId partitionGroupId) { + // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. + Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); + + var tableIds = new ArrayList<Integer>(); Review Comment: https://github.com/apache/ignite-3/pull/5276#discussion_r1981170555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org