tkalkirill commented on code in PR #5276: URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1979389112
########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java: ########## @@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() { return tablesById.values(); } + /** + * Returns all tables that belong to the specified zone. + * + * @return A collection of table descriptors. + */ Review Comment: ```suggestion /** Returns all tables that belong to the specified zone, soeted by {@link CatalogObjectDescriptor#id}. */ ``` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java: ########## @@ -327,4 +339,24 @@ private static Int2ObjectMap<List<CatalogIndexDescriptor>> toIndexesByTableId(Co return indexesByTableId; } + + private static Int2ObjectMap<List<CatalogTableDescriptor>> toTablesByZoneId(Collection<CatalogSchemaDescriptor> schemas) { + Int2ObjectMap<List<CatalogTableDescriptor>> tablesByZoneId = new Int2ObjectOpenHashMap<>(); Review Comment: ```suggestion var tablesByZoneId = new Int2ObjectOpenHashMap<List<CatalogTableDescriptor>>(); ``` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java: ########## @@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() { return tablesById.values(); } + /** + * Returns all tables that belong to the specified zone. + * + * @return A collection of table descriptors. + */ + public Collection<CatalogTableDescriptor> tables(int zoneId) { Review Comment: I didn't find a test for this method, if there is then we'll skip it, if not, then we need to add it and check that the list will be sorted. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -184,60 +232,104 @@ private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameter private CompletableFuture<?> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - primaryReplicaIds.add(primaryReplicaId); + // TODO https://issues.apache.org/jira/browse/IGNITE-22522 + // Need to remove TablePartitionId check here and below. + assert parameters.groupId() instanceof ZonePartitionId || parameters.groupId() instanceof TablePartitionId : Review Comment: I think it will be enough to simply print the `parameters.groupId()` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java: ########## @@ -169,11 +170,30 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { * * @param causalityToken Causality token. * @param tableId Table ID. - * @return Future with multi-version table storage, completes with {@code null} if the table does not exist according to the passed - * parameters. + * @return Future with multi-version table storage, completes with an exception if the table or storage does not exist + * according to the passed parameters. */ - CompletableFuture<@Nullable MvTableStorage> getMvTableStorage(long causalityToken, int tableId) { - return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table == null ? null : table.internalTable().storage()); + CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken, int tableId) { + return tableManager + .tableAsync(causalityToken, tableId) + .thenApply(table -> { + if (table == null) { + throw new IgniteInternalException( Review Comment: You could check the scenario when a table is deleted with all indexes, because of this case `null` was returned. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.handlers; + +import static java.util.Objects.requireNonNull; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED; +import static org.apache.ignite.internal.util.CollectionUtils.last; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand; +import org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowUpgrader; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.storage.BinaryRowAndRowId; +import org.apache.ignite.internal.storage.MvPartitionStorage.Locker; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.index.IndexMeta; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange; +import org.jetbrains.annotations.Nullable; + +/** + * Raft command handler that handles {@link BuildIndexCommand}. + */ +public class BuildIndexCommandHandler extends AbstractCommandHandler<BuildIndexCommand> { + private static final IgniteLogger LOG = Loggers.forClass(BuildIndexCommandHandler.class); + + /** Data storage to which the command will be applied. */ + private final PartitionDataStorage storage; + + private final IndexMetaStorage indexMetaStorage; + + /** Handler that processes storage updates. */ + private final StorageUpdateHandler storageUpdateHandler; + + private final SchemaRegistry schemaRegistry; + + /** + * Creates a new instance of the command handler. + * + * @param storage Partition data storage. + * @param indexMetaStorage Index meta storage. + * @param storageUpdateHandler Storage update handler. + * @param schemaRegistry Schema registry. + */ + public BuildIndexCommandHandler( + PartitionDataStorage storage, + IndexMetaStorage indexMetaStorage, + StorageUpdateHandler storageUpdateHandler, + SchemaRegistry schemaRegistry + ) { + this.storage = storage; + this.indexMetaStorage = indexMetaStorage; + this.storageUpdateHandler = storageUpdateHandler; + this.schemaRegistry = schemaRegistry; + } + + @Override + protected IgniteBiTuple<Serializable, Boolean> handleInternally( + BuildIndexCommand command, + long commandIndex, + long commandTerm, + @Nullable HybridTimestamp safeTimestamp + ) throws IgniteInternalException { + // Skips the write command because the storage has already executed it. + if (commandIndex <= storage.lastAppliedIndex()) { + return new IgniteBiTuple<>(null, false); + } + + IndexMeta indexMeta = indexMetaStorage.indexMeta(command.indexId()); + + if (indexMeta == null || indexMeta.isDropped()) { + // Index has been dropped. + return new IgniteBiTuple<>(null, true); + } + + BuildIndexRowVersionChooser rowVersionChooser = createBuildIndexRowVersionChooser(indexMeta); + + BinaryRowUpgrader binaryRowUpgrader = createBinaryRowUpgrader(indexMeta); + + storage.runConsistently(locker -> { + List<UUID> rowUuids = new ArrayList<>(command.rowIds()); Review Comment: ```suggestion var rowUuids = new ArrayList<UUID>(command.rowIds()); ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE; +import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.sql.ColumnType.DOUBLE; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.apache.ignite.sql.ColumnType.INT64; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.index.message.IndexMessageGroup; +import org.apache.ignite.internal.index.message.IndexMessagesFactory; +import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication +/** + * Tests building indices for colocation track. + */ +@Timeout(60) +public class ItBuildIndexTest extends ItAbstractColocationTest { + private static final IndexMessagesFactory FACTORY = new IndexMessagesFactory(); + + @Test + public void testBuildIndex() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + + String zoneName = "test-zone"; Review Comment: Please use constant ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +142,73 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() Review Comment: I think it could have reduced the number of lines, but I won’t insist. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +139,75 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" + + parameters.indexId() + "]."; + + assert catalog.table(indexDescriptor.tableId()) != null : "Failed to find a table descriptor for the specified index [indexId=" + + parameters.indexId() + ", tableId=" + indexDescriptor.tableId() + "]."; + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId()); + + assert zoneDescriptor != null : "Failed to find a zone descriptor for the specified table [indexId=" Review Comment: Please indicate catalog version. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +139,75 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" + + parameters.indexId() + "]."; + + assert catalog.table(indexDescriptor.tableId()) != null : "Failed to find a table descriptor for the specified index [indexId=" Review Comment: Please indicate catalog version. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +139,75 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, Review Comment: Maybe try method somelike `org.apache.ignite.internal.event.EventListener#fromConsumer` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java: ########## @@ -327,4 +339,24 @@ private static Int2ObjectMap<List<CatalogIndexDescriptor>> toIndexesByTableId(Co return indexesByTableId; } + + private static Int2ObjectMap<List<CatalogTableDescriptor>> toTablesByZoneId(Collection<CatalogSchemaDescriptor> schemas) { + Int2ObjectMap<List<CatalogTableDescriptor>> tablesByZoneId = new Int2ObjectOpenHashMap<>(); + + for (CatalogSchemaDescriptor schema : schemas) { + for (CatalogTableDescriptor table : schema.tables()) { + tablesByZoneId.computeIfAbsent(table.zoneId(), tables -> new ArrayList<>()).add(table); + } + } + + for (List<CatalogTableDescriptor> tables : tablesByZoneId.values()) { + tables.sort(comparingInt(CatalogTableDescriptor::id)); + } + + for (Entry<List<CatalogTableDescriptor>> entry : tablesByZoneId.int2ObjectEntrySet()) { + entry.setValue(unmodifiableList(entry.getValue())); Review Comment: I think it would be more efficient to make a copy of the immutable list, so as not to waste memory on the lists. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +139,75 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" Review Comment: Please indicate catalog version. ########## modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java: ########## @@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters parameters) { private void onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { inBusyLock(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + PartitionGroupId primaryReplicaId = (PartitionGroupId) parameters.groupId(); if (primaryReplicaId.partitionId() != 0) { // We are only interested in the 0 partition. return; } - int tableId = primaryReplicaId.tableId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) { - scheduleTasksOnPrimaryReplicaElectedBusy(tableId); - } + scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId); } else { - if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) { - changeIndexStatusTaskScheduler.stopTasksForTable(tableId); - } + scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId); } }); } - private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) { + private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId partitionGroupId) { // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { - switch (indexDescriptor.status()) { - case REGISTERED: - changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + var tableIds = new ArrayList<Integer>(); + + if (enabledColocation()) { + ZonePartitionId zonePartitionId = (ZonePartitionId) partitionGroupId; + + for (CatalogTableDescriptor table : catalog.tables(zonePartitionId.zoneId())) { + if (localNodeIsPrimaryReplicaForTableIds.add(table.id())) { + tableIds.add(table.id()); + } + } + } else { + TablePartitionId tablePartitionId = (TablePartitionId) partitionGroupId; + + if (localNodeIsPrimaryReplicaForTableIds.add(tablePartitionId.tableId())) { + tableIds.add(tablePartitionId.tableId()); + } + } - break; + for (Integer tableId : tableIds) { + for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { + switch (indexDescriptor.status()) { + case REGISTERED: + changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); - case STOPPING: - changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor); + break; - break; + case STOPPING: + changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor); - default: - break; + break; + + default: + break; + } } } } + + private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId partitionGroupId) { + // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. + Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); + + var tableIds = new ArrayList<Integer>(); Review Comment: The same goes for a separate method. ########## modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java: ########## @@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters parameters) { private void onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { inBusyLock(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + PartitionGroupId primaryReplicaId = (PartitionGroupId) parameters.groupId(); if (primaryReplicaId.partitionId() != 0) { // We are only interested in the 0 partition. return; } - int tableId = primaryReplicaId.tableId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) { - scheduleTasksOnPrimaryReplicaElectedBusy(tableId); - } + scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId); } else { - if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) { - changeIndexStatusTaskScheduler.stopTasksForTable(tableId); - } + scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId); } }); } - private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) { + private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId partitionGroupId) { // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - for (CatalogIndexDescriptor indexDescriptor : catalog.indexes(tableId)) { - switch (indexDescriptor.status()) { - case REGISTERED: - changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor); + var tableIds = new ArrayList<Integer>(); Review Comment: I would suggest moving this code for creating and filling this collection into a separate method. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +139,75 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" + + parameters.indexId() + "]."; + + assert catalog.table(indexDescriptor.tableId()) != null : "Failed to find a table descriptor for the specified index [indexId=" + + parameters.indexId() + ", tableId=" + indexDescriptor.tableId() + "]."; + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId()); + + assert zoneDescriptor != null : "Failed to find a zone descriptor for the specified table [indexId=" + + parameters.indexId() + ", tableId=" + indexDescriptor.tableId() + "]."; + var startBuildIndexFutures = new ArrayList<CompletableFuture<?>>(); - for (TablePartitionId primaryReplicaId : primaryReplicaIds) { - if (primaryReplicaId.tableId() == indexDescriptor.tableId()) { - CompletableFuture<?> startBuildIndexFuture = getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId) - .thenCompose(mvTableStorage -> awaitPrimaryReplica(primaryReplicaId, clockService.now()) + for (ReplicationGroupId primaryReplicationGroupId : primaryReplicaIds) { + int zoneId; + int partitionId; + boolean needToProcessPartition; + + // TODO https://issues.apache.org/jira/browse/IGNITE-22522 + // Remove TablePartitionId check. + if (primaryReplicationGroupId instanceof ZonePartitionId) { + ZonePartitionId zoneReplicaId = (ZonePartitionId) primaryReplicationGroupId; + zoneId = zoneReplicaId.zoneId(); + partitionId = zoneReplicaId.partitionId(); + + needToProcessPartition = zoneReplicaId.zoneId() == zoneDescriptor.id(); + } else { + TablePartitionId partitionReplicaId = (TablePartitionId) primaryReplicationGroupId; + needToProcessPartition = partitionReplicaId.tableId() == indexDescriptor.tableId(); + + if (needToProcessPartition) { + zoneId = catalog.table(partitionReplicaId.tableId()).zoneId(); + partitionId = partitionReplicaId.partitionId(); + } else { + continue; + } + } + + if (needToProcessPartition) { + CompletableFuture<MvTableStorage> tableStorageFuture = Review Comment: Do we really need this variable? Looks like a sufficient pipeline. ########## modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java: ########## @@ -372,12 +356,20 @@ private int tableId(String tableName) { return getTableIdStrict(catalogManager, tableName, clock.nowLong()); } + private int zoneId() { + return getZoneIdStrict(catalogManager, TABLE_NAME, clock.nowLong()); Review Comment: I would rename the method to something like `getZoneIdByTableNameStrict` and expect the zone name to be passed in that case. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -484,9 +484,16 @@ public PartitionReplicaListener( clusterNodeResolver, replicationGroupId, localNode, - txRecoveryEngine - ); + txRecoveryEngine); + + buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler( + indexMetaStorage, + txRwOperationTracker, + safeTime, + raftCommandApplicator); Review Comment: ```suggestion raftCommandApplicator ); ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -484,9 +484,16 @@ public PartitionReplicaListener( clusterNodeResolver, replicationGroupId, localNode, - txRecoveryEngine - ); + txRecoveryEngine); Review Comment: ```suggestion txRecoveryEngine ); ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.replicator.handlers; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand; +import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; +import org.apache.ignite.internal.table.distributed.index.IndexMeta; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange; +import org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; + +/** + * Handler for {@link BuildIndexReplicaRequest}. + */ +public class BuildIndexReplicaRequestHandler { + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + private final IndexMetaStorage indexMetaStorage; + + /** Read-write transaction operation tracker for building indexes. */ + private final IndexBuilderTxRwOperationTracker txRwOperationTracker; + + /** Safe time tracker. */ + private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime; Review Comment: Whose tracker, metastorage or partition or some other dude. ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE; +import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.sql.ColumnType.DOUBLE; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.apache.ignite.sql.ColumnType.INT64; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.index.message.IndexMessageGroup; +import org.apache.ignite.internal.index.message.IndexMessagesFactory; +import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication +/** + * Tests building indices for colocation track. + */ +@Timeout(60) +public class ItBuildIndexTest extends ItAbstractColocationTest { + private static final IndexMessagesFactory FACTORY = new IndexMessagesFactory(); + + @Test + public void testBuildIndex() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + + String zoneName = "test-zone"; + createZone(node, zoneName, 1, 1); + + String tableName = "TEST_TABLE"; Review Comment: same please use constant ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE; +import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.sql.ColumnType.DOUBLE; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.apache.ignite.sql.ColumnType.INT64; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.index.message.IndexMessageGroup; +import org.apache.ignite.internal.index.message.IndexMessagesFactory; +import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication +/** + * Tests building indices for colocation track. + */ +@Timeout(60) +public class ItBuildIndexTest extends ItAbstractColocationTest { + private static final IndexMessagesFactory FACTORY = new IndexMessagesFactory(); + + @Test + public void testBuildIndex() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + + String zoneName = "test-zone"; + createZone(node, zoneName, 1, 1); + + String tableName = "TEST_TABLE"; + createCustomTable(node, zoneName, tableName); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + // Test node does not create IndexNodeFinishedRwTransactionsChecker, so the following code is needed to unblock index building. + // It's easier than creating a real service with all its dependencies. + node.clusterService.messagingService().addMessageHandler( + IndexMessageGroup.class, + (message, sender, correlationId) -> { + if (message instanceof IsNodeFinishedRwTransactionsStartedBeforeRequest) { + node.clusterService.messagingService().respond( + sender, + FACTORY.isNodeFinishedRwTransactionsStartedBeforeResponse().finished(true).build(), + correlationId + ); + } + }); + + TableViewInternal tableViewInternal = node.tableManager.table(tableId); + KeyValueView<Tuple, Tuple> tableView = tableViewInternal.keyValueView(); + + node.transactions().runInTransaction(tx -> { + Tuple key = Tuple.create().set("KEY", 1L); + Tuple value = Tuple.create().set("VAL", 1).set("DOUBLEVAL", 1.0); + tableView.putAll(tx, Map.of(key, value)); + }); + + // This async transaction is needed to update safe time, just because the idle safe time propagation is not implemented yet. + // https://issues.apache.org/jira/browse/IGNITE-22620 + CompletableFuture.runAsync(() -> { Review Comment: Why don't you do anything with the return futures? ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE; +import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.sql.ColumnType.DOUBLE; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.apache.ignite.sql.ColumnType.INT64; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.index.message.IndexMessageGroup; +import org.apache.ignite.internal.index.message.IndexMessagesFactory; +import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication +/** + * Tests building indices for colocation track. + */ +@Timeout(60) +public class ItBuildIndexTest extends ItAbstractColocationTest { + private static final IndexMessagesFactory FACTORY = new IndexMessagesFactory(); + + @Test + public void testBuildIndex() throws Exception { + // Prepare a single node cluster. + startCluster(1); + Node node = getNode(0); + + String zoneName = "test-zone"; + createZone(node, zoneName, 1, 1); + + String tableName = "TEST_TABLE"; + createCustomTable(node, zoneName, tableName); + int tableId = TableTestUtils.getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + + // Test node does not create IndexNodeFinishedRwTransactionsChecker, so the following code is needed to unblock index building. + // It's easier than creating a real service with all its dependencies. + node.clusterService.messagingService().addMessageHandler( + IndexMessageGroup.class, + (message, sender, correlationId) -> { + if (message instanceof IsNodeFinishedRwTransactionsStartedBeforeRequest) { + node.clusterService.messagingService().respond( + sender, + FACTORY.isNodeFinishedRwTransactionsStartedBeforeResponse().finished(true).build(), + correlationId + ); + } + }); + + TableViewInternal tableViewInternal = node.tableManager.table(tableId); + KeyValueView<Tuple, Tuple> tableView = tableViewInternal.keyValueView(); + + node.transactions().runInTransaction(tx -> { + Tuple key = Tuple.create().set("KEY", 1L); + Tuple value = Tuple.create().set("VAL", 1).set("DOUBLEVAL", 1.0); + tableView.putAll(tx, Map.of(key, value)); + }); + + // This async transaction is needed to update safe time, just because the idle safe time propagation is not implemented yet. + // https://issues.apache.org/jira/browse/IGNITE-22620 + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(1_000); Review Comment: Why sleep, why a second, suddenly it's not enough? Using sleep is bad practice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org