tkalkirill commented on code in PR #5276:
URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1979389112


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() {
         return tablesById.values();
     }
 
+    /**
+     * Returns all tables that belong to the specified zone.
+     *
+     * @return A collection of table descriptors.
+     */

Review Comment:
   ```suggestion
       /** Returns all tables that belong to the specified zone, soeted by 
{@link CatalogObjectDescriptor#id}. */
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -327,4 +339,24 @@ private static Int2ObjectMap<List<CatalogIndexDescriptor>> 
toIndexesByTableId(Co
 
         return indexesByTableId;
     }
+
+    private static Int2ObjectMap<List<CatalogTableDescriptor>> 
toTablesByZoneId(Collection<CatalogSchemaDescriptor> schemas) {
+        Int2ObjectMap<List<CatalogTableDescriptor>> tablesByZoneId = new 
Int2ObjectOpenHashMap<>();

Review Comment:
   ```suggestion
           var tablesByZoneId = new 
Int2ObjectOpenHashMap<List<CatalogTableDescriptor>>();
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() {
         return tablesById.values();
     }
 
+    /**
+     * Returns all tables that belong to the specified zone.
+     *
+     * @return A collection of table descriptors.
+     */
+    public Collection<CatalogTableDescriptor> tables(int zoneId) {

Review Comment:
   I didn't find a test for this method, if there is then we'll skip it, if 
not, then we need to add it and check that the list will be sorted.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +232,104 @@ private CompletableFuture<?> 
onIndexRemoved(RemoveIndexEventParameters parameter
 
     private CompletableFuture<?> 
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                primaryReplicaIds.add(primaryReplicaId);
+                // TODO https://issues.apache.org/jira/browse/IGNITE-22522
+                // Need to remove TablePartitionId check here and below.
+                assert parameters.groupId() instanceof ZonePartitionId || 
parameters.groupId() instanceof TablePartitionId :

Review Comment:
   I think it will be enough to simply print the `parameters.groupId()`



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -169,11 +170,30 @@ public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
      *
      * @param causalityToken Causality token.
      * @param tableId Table ID.
-     * @return Future with multi-version table storage, completes with {@code 
null} if the table does not exist according to the passed
-     *      parameters.
+     * @return Future with multi-version table storage, completes with an 
exception if the table or storage does not exist
+     *      according to the passed parameters.
      */
-    CompletableFuture<@Nullable MvTableStorage> getMvTableStorage(long 
causalityToken, int tableId) {
-        return tableManager.tableAsync(causalityToken, 
tableId).thenApply(table -> table == null ? null : 
table.internalTable().storage());
+    CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken, 
int tableId) {
+        return tableManager
+                .tableAsync(causalityToken, tableId)
+                .thenApply(table -> {
+                    if (table == null) {
+                        throw new IgniteInternalException(

Review Comment:
   You could check the scenario when a table is deleted with all indexes, 
because of this case `null` was returned.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.handlers;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
+import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.util.CollectionUtils.last;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowUpgrader;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
+import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Raft command handler that handles {@link BuildIndexCommand}.
+ */
+public class BuildIndexCommandHandler extends 
AbstractCommandHandler<BuildIndexCommand> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(BuildIndexCommandHandler.class);
+
+    /** Data storage to which the command will be applied. */
+    private final PartitionDataStorage storage;
+
+    private final IndexMetaStorage indexMetaStorage;
+
+    /** Handler that processes storage updates. */
+    private final StorageUpdateHandler storageUpdateHandler;
+
+    private final SchemaRegistry schemaRegistry;
+
+    /**
+     * Creates a new instance of the command handler.
+     *
+     * @param storage Partition data storage.
+     * @param indexMetaStorage Index meta storage.
+     * @param storageUpdateHandler Storage update handler.
+     * @param schemaRegistry Schema registry.
+     */
+    public BuildIndexCommandHandler(
+            PartitionDataStorage storage,
+            IndexMetaStorage indexMetaStorage,
+            StorageUpdateHandler storageUpdateHandler,
+            SchemaRegistry schemaRegistry
+    ) {
+        this.storage = storage;
+        this.indexMetaStorage = indexMetaStorage;
+        this.storageUpdateHandler = storageUpdateHandler;
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    protected IgniteBiTuple<Serializable, Boolean> handleInternally(
+            BuildIndexCommand command,
+            long commandIndex,
+            long commandTerm,
+            @Nullable HybridTimestamp safeTimestamp
+    ) throws IgniteInternalException {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return new IgniteBiTuple<>(null, false);
+        }
+
+        IndexMeta indexMeta = indexMetaStorage.indexMeta(command.indexId());
+
+        if (indexMeta == null || indexMeta.isDropped()) {
+            // Index has been dropped.
+            return new IgniteBiTuple<>(null, true);
+        }
+
+        BuildIndexRowVersionChooser rowVersionChooser = 
createBuildIndexRowVersionChooser(indexMeta);
+
+        BinaryRowUpgrader binaryRowUpgrader = 
createBinaryRowUpgrader(indexMeta);
+
+        storage.runConsistently(locker -> {
+            List<UUID> rowUuids = new ArrayList<>(command.rowIds());

Review Comment:
   ```suggestion
               var rowUuids = new ArrayList<UUID>(command.rowIds());
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.index.message.IndexMessageGroup;
+import org.apache.ignite.internal.index.message.IndexMessagesFactory;
+import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
+/**
+ * Tests building indices for colocation track.
+ */
+@Timeout(60)
+public class ItBuildIndexTest extends ItAbstractColocationTest {
+    private static final IndexMessagesFactory FACTORY = new 
IndexMessagesFactory();
+
+    @Test
+    public void testBuildIndex() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+
+        String zoneName = "test-zone";

Review Comment:
   Please use constant



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +142,73 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,
+                (StartBuildingIndexEventParameters parameters) -> 
onIndexBuilding(parameters).thenApply(unused -> false));
 
-        catalogService.listen(CatalogEvent.INDEX_REMOVED, 
(RemoveIndexEventParameters parameters) -> {
-            return onIndexRemoved(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_REMOVED,
+                (RemoveIndexEventParameters parameters) -> 
onIndexRemoved(parameters).thenApply(unused -> false));
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
parameters -> {
-            return onPrimaryReplicaElected(parameters).thenApply(unused -> 
false);
-        });
+        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> 
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
     }
 
     private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             Catalog catalog = 
catalogService.catalog(parameters.catalogVersion());
 
-            assert catalog != null : "Not found catalog for version " +  
parameters.catalogVersion();
+            assert catalog != null : "Failed to find a catalog for the 
specified version [version=" +  parameters.catalogVersion()

Review Comment:
   I think it could have reduced the number of lines, but I won’t insist.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +139,75 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,
+                (StartBuildingIndexEventParameters parameters) -> 
onIndexBuilding(parameters).thenApply(unused -> false));
 
-        catalogService.listen(CatalogEvent.INDEX_REMOVED, 
(RemoveIndexEventParameters parameters) -> {
-            return onIndexRemoved(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_REMOVED,
+                (RemoveIndexEventParameters parameters) -> 
onIndexRemoved(parameters).thenApply(unused -> false));
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
parameters -> {
-            return onPrimaryReplicaElected(parameters).thenApply(unused -> 
false);
-        });
+        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> 
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
     }
 
     private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             Catalog catalog = 
catalogService.catalog(parameters.catalogVersion());
 
-            assert catalog != null : "Not found catalog for version " +  
parameters.catalogVersion();
+            assert catalog != null : "Failed to find a catalog for the 
specified version [version=" +  parameters.catalogVersion()
+                    + ", earliestVersion=" + 
catalogService.earliestCatalogVersion()
+                    + ", latestVersion=" + 
catalogService.latestCatalogVersion()
+                    + "].";
 
             CatalogIndexDescriptor indexDescriptor = 
catalog.index(parameters.indexId());
 
+            assert indexDescriptor != null : "Failed to find an index 
descriptor for the specified index [indexId="
+                    + parameters.indexId() + "].";
+
+            assert catalog.table(indexDescriptor.tableId()) != null : "Failed 
to find a table descriptor for the specified index [indexId="
+                    + parameters.indexId() + ", tableId=" + 
indexDescriptor.tableId() + "].";
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId());
+
+            assert zoneDescriptor != null : "Failed to find a zone descriptor 
for the specified table [indexId="

Review Comment:
   Please indicate catalog version.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +139,75 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,
+                (StartBuildingIndexEventParameters parameters) -> 
onIndexBuilding(parameters).thenApply(unused -> false));
 
-        catalogService.listen(CatalogEvent.INDEX_REMOVED, 
(RemoveIndexEventParameters parameters) -> {
-            return onIndexRemoved(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_REMOVED,
+                (RemoveIndexEventParameters parameters) -> 
onIndexRemoved(parameters).thenApply(unused -> false));
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
parameters -> {
-            return onPrimaryReplicaElected(parameters).thenApply(unused -> 
false);
-        });
+        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> 
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
     }
 
     private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             Catalog catalog = 
catalogService.catalog(parameters.catalogVersion());
 
-            assert catalog != null : "Not found catalog for version " +  
parameters.catalogVersion();
+            assert catalog != null : "Failed to find a catalog for the 
specified version [version=" +  parameters.catalogVersion()
+                    + ", earliestVersion=" + 
catalogService.earliestCatalogVersion()
+                    + ", latestVersion=" + 
catalogService.latestCatalogVersion()
+                    + "].";
 
             CatalogIndexDescriptor indexDescriptor = 
catalog.index(parameters.indexId());
 
+            assert indexDescriptor != null : "Failed to find an index 
descriptor for the specified index [indexId="
+                    + parameters.indexId() + "].";
+
+            assert catalog.table(indexDescriptor.tableId()) != null : "Failed 
to find a table descriptor for the specified index [indexId="

Review Comment:
   Please indicate catalog version.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +139,75 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,

Review Comment:
   Maybe try method somelike 
`org.apache.ignite.internal.event.EventListener#fromConsumer`



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -327,4 +339,24 @@ private static Int2ObjectMap<List<CatalogIndexDescriptor>> 
toIndexesByTableId(Co
 
         return indexesByTableId;
     }
+
+    private static Int2ObjectMap<List<CatalogTableDescriptor>> 
toTablesByZoneId(Collection<CatalogSchemaDescriptor> schemas) {
+        Int2ObjectMap<List<CatalogTableDescriptor>> tablesByZoneId = new 
Int2ObjectOpenHashMap<>();
+
+        for (CatalogSchemaDescriptor schema : schemas) {
+            for (CatalogTableDescriptor table : schema.tables()) {
+                tablesByZoneId.computeIfAbsent(table.zoneId(), tables -> new 
ArrayList<>()).add(table);
+            }
+        }
+
+        for (List<CatalogTableDescriptor> tables : tablesByZoneId.values()) {
+            tables.sort(comparingInt(CatalogTableDescriptor::id));
+        }
+
+        for (Entry<List<CatalogTableDescriptor>> entry : 
tablesByZoneId.int2ObjectEntrySet()) {
+            entry.setValue(unmodifiableList(entry.getValue()));

Review Comment:
   I think it would be more efficient to make a copy of the immutable list, so 
as not to waste memory on the lists.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +139,75 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,
+                (StartBuildingIndexEventParameters parameters) -> 
onIndexBuilding(parameters).thenApply(unused -> false));
 
-        catalogService.listen(CatalogEvent.INDEX_REMOVED, 
(RemoveIndexEventParameters parameters) -> {
-            return onIndexRemoved(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_REMOVED,
+                (RemoveIndexEventParameters parameters) -> 
onIndexRemoved(parameters).thenApply(unused -> false));
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
parameters -> {
-            return onPrimaryReplicaElected(parameters).thenApply(unused -> 
false);
-        });
+        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> 
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
     }
 
     private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             Catalog catalog = 
catalogService.catalog(parameters.catalogVersion());
 
-            assert catalog != null : "Not found catalog for version " +  
parameters.catalogVersion();
+            assert catalog != null : "Failed to find a catalog for the 
specified version [version=" +  parameters.catalogVersion()
+                    + ", earliestVersion=" + 
catalogService.earliestCatalogVersion()
+                    + ", latestVersion=" + 
catalogService.latestCatalogVersion()
+                    + "].";
 
             CatalogIndexDescriptor indexDescriptor = 
catalog.index(parameters.indexId());
 
+            assert indexDescriptor != null : "Failed to find an index 
descriptor for the specified index [indexId="

Review Comment:
   Please indicate catalog version.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java:
##########
@@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters 
parameters) {
 
     private void onPrimaryReplicaElected(PrimaryReplicaEventParameters 
parameters) {
         inBusyLock(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
+
+            PartitionGroupId primaryReplicaId = (PartitionGroupId) 
parameters.groupId();
 
             if (primaryReplicaId.partitionId() != 0) {
                 // We are only interested in the 0 partition.
                 return;
             }
 
-            int tableId = primaryReplicaId.tableId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) {
-                    scheduleTasksOnPrimaryReplicaElectedBusy(tableId);
-                }
+                scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId);
             } else {
-                if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) {
-                    changeIndexStatusTaskScheduler.stopTasksForTable(tableId);
-                }
+                scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId);
             }
         });
     }
 
-    private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) {
+    private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId 
partitionGroupId) {
         // It is safe to get the latest version of the catalog because the 
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
         Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-        for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableId)) {
-            switch (indexDescriptor.status()) {
-                case REGISTERED:
-                    
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+        var tableIds = new ArrayList<Integer>();
+
+        if (enabledColocation()) {
+            ZonePartitionId zonePartitionId = (ZonePartitionId) 
partitionGroupId;
+
+            for (CatalogTableDescriptor table : 
catalog.tables(zonePartitionId.zoneId())) {
+                if (localNodeIsPrimaryReplicaForTableIds.add(table.id())) {
+                    tableIds.add(table.id());
+                }
+            }
+        } else {
+            TablePartitionId tablePartitionId = (TablePartitionId) 
partitionGroupId;
+
+            if 
(localNodeIsPrimaryReplicaForTableIds.add(tablePartitionId.tableId())) {
+                tableIds.add(tablePartitionId.tableId());
+            }
+        }
 
-                    break;
+        for (Integer tableId : tableIds) {
+            for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableId)) {
+                switch (indexDescriptor.status()) {
+                    case REGISTERED:
+                        
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
 
-                case STOPPING:
-                    
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
+                        break;
 
-                    break;
+                    case STOPPING:
+                        
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
 
-                default:
-                    break;
+                        break;
+
+                    default:
+                        break;
+                }
             }
         }
     }
+
+    private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId 
partitionGroupId) {
+        // It is safe to get the latest version of the catalog because the 
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
+        Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
+
+        var tableIds = new ArrayList<Integer>();

Review Comment:
   The same goes for a separate method.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java:
##########
@@ -153,46 +158,88 @@ private void onIndexRemoved(RemoveIndexEventParameters 
parameters) {
 
     private void onPrimaryReplicaElected(PrimaryReplicaEventParameters 
parameters) {
         inBusyLock(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
+
+            PartitionGroupId primaryReplicaId = (PartitionGroupId) 
parameters.groupId();
 
             if (primaryReplicaId.partitionId() != 0) {
                 // We are only interested in the 0 partition.
                 return;
             }
 
-            int tableId = primaryReplicaId.tableId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) {
-                    scheduleTasksOnPrimaryReplicaElectedBusy(tableId);
-                }
+                scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId);
             } else {
-                if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) {
-                    changeIndexStatusTaskScheduler.stopTasksForTable(tableId);
-                }
+                scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId);
             }
         });
     }
 
-    private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) {
+    private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId 
partitionGroupId) {
         // It is safe to get the latest version of the catalog because the 
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
         Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-        for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableId)) {
-            switch (indexDescriptor.status()) {
-                case REGISTERED:
-                    
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+        var tableIds = new ArrayList<Integer>();

Review Comment:
   I would suggest moving this code for creating and filling this collection 
into a separate method.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -131,35 +139,75 @@ public void close() {
     }
 
     private void addListeners() {
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
(StartBuildingIndexEventParameters parameters) -> {
-            return onIndexBuilding(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_BUILDING,
+                (StartBuildingIndexEventParameters parameters) -> 
onIndexBuilding(parameters).thenApply(unused -> false));
 
-        catalogService.listen(CatalogEvent.INDEX_REMOVED, 
(RemoveIndexEventParameters parameters) -> {
-            return onIndexRemoved(parameters).thenApply(unused -> false);
-        });
+        catalogService.listen(INDEX_REMOVED,
+                (RemoveIndexEventParameters parameters) -> 
onIndexRemoved(parameters).thenApply(unused -> false));
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
parameters -> {
-            return onPrimaryReplicaElected(parameters).thenApply(unused -> 
false);
-        });
+        placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> 
onPrimaryReplicaElected(parameters).thenApply(unused -> false));
     }
 
     private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             Catalog catalog = 
catalogService.catalog(parameters.catalogVersion());
 
-            assert catalog != null : "Not found catalog for version " +  
parameters.catalogVersion();
+            assert catalog != null : "Failed to find a catalog for the 
specified version [version=" +  parameters.catalogVersion()
+                    + ", earliestVersion=" + 
catalogService.earliestCatalogVersion()
+                    + ", latestVersion=" + 
catalogService.latestCatalogVersion()
+                    + "].";
 
             CatalogIndexDescriptor indexDescriptor = 
catalog.index(parameters.indexId());
 
+            assert indexDescriptor != null : "Failed to find an index 
descriptor for the specified index [indexId="
+                    + parameters.indexId() + "].";
+
+            assert catalog.table(indexDescriptor.tableId()) != null : "Failed 
to find a table descriptor for the specified index [indexId="
+                    + parameters.indexId() + ", tableId=" + 
indexDescriptor.tableId() + "].";
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId());
+
+            assert zoneDescriptor != null : "Failed to find a zone descriptor 
for the specified table [indexId="
+                    + parameters.indexId() + ", tableId=" + 
indexDescriptor.tableId() + "].";
+
             var startBuildIndexFutures = new ArrayList<CompletableFuture<?>>();
 
-            for (TablePartitionId primaryReplicaId : primaryReplicaIds) {
-                if (primaryReplicaId.tableId() == indexDescriptor.tableId()) {
-                    CompletableFuture<?> startBuildIndexFuture = 
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
-                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, clockService.now())
+            for (ReplicationGroupId primaryReplicationGroupId : 
primaryReplicaIds) {
+                int zoneId;
+                int partitionId;
+                boolean needToProcessPartition;
+
+                // TODO https://issues.apache.org/jira/browse/IGNITE-22522
+                // Remove TablePartitionId check.
+                if (primaryReplicationGroupId instanceof ZonePartitionId) {
+                    ZonePartitionId zoneReplicaId = (ZonePartitionId) 
primaryReplicationGroupId;
+                    zoneId = zoneReplicaId.zoneId();
+                    partitionId = zoneReplicaId.partitionId();
+
+                    needToProcessPartition = zoneReplicaId.zoneId() == 
zoneDescriptor.id();
+                } else {
+                    TablePartitionId partitionReplicaId = (TablePartitionId) 
primaryReplicationGroupId;
+                    needToProcessPartition = partitionReplicaId.tableId() == 
indexDescriptor.tableId();
+
+                    if (needToProcessPartition) {
+                        zoneId = 
catalog.table(partitionReplicaId.tableId()).zoneId();
+                        partitionId = partitionReplicaId.partitionId();
+                    } else {
+                        continue;
+                    }
+                }
+
+                if (needToProcessPartition) {
+                    CompletableFuture<MvTableStorage> tableStorageFuture =

Review Comment:
   Do we really need this variable? Looks like a sufficient pipeline.



##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java:
##########
@@ -372,12 +356,20 @@ private int tableId(String tableName) {
         return getTableIdStrict(catalogManager, tableName, clock.nowLong());
     }
 
+    private int zoneId() {
+        return getZoneIdStrict(catalogManager, TABLE_NAME, clock.nowLong());

Review Comment:
   I would rename the method to something like `getZoneIdByTableNameStrict` and 
expect the zone name to be passed in that case.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -484,9 +484,16 @@ public PartitionReplicaListener(
                 clusterNodeResolver,
                 replicationGroupId,
                 localNode,
-                txRecoveryEngine
-        );
+                txRecoveryEngine);
+
+        buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
+                indexMetaStorage,
+                txRwOperationTracker,
+                safeTime,
+                raftCommandApplicator);

Review Comment:
   ```suggestion
                   raftCommandApplicator
                   );
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -484,9 +484,16 @@ public PartitionReplicaListener(
                 clusterNodeResolver,
                 replicationGroupId,
                 localNode,
-                txRecoveryEngine
-        );
+                txRecoveryEngine);

Review Comment:
   ```suggestion
                   txRecoveryEngine
                   );
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
+import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import 
org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Handler for {@link BuildIndexReplicaRequest}.
+ */
+public class BuildIndexReplicaRequestHandler {
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    private final IndexMetaStorage indexMetaStorage;
+
+    /** Read-write transaction operation tracker for building indexes. */
+    private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
+
+    /** Safe time tracker. */
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;

Review Comment:
   Whose tracker, metastorage or partition or some other dude.



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.index.message.IndexMessageGroup;
+import org.apache.ignite.internal.index.message.IndexMessagesFactory;
+import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
+/**
+ * Tests building indices for colocation track.
+ */
+@Timeout(60)
+public class ItBuildIndexTest extends ItAbstractColocationTest {
+    private static final IndexMessagesFactory FACTORY = new 
IndexMessagesFactory();
+
+    @Test
+    public void testBuildIndex() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+
+        String zoneName = "test-zone";
+        createZone(node, zoneName, 1, 1);
+
+        String tableName = "TEST_TABLE";

Review Comment:
   same please use constant



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.index.message.IndexMessageGroup;
+import org.apache.ignite.internal.index.message.IndexMessagesFactory;
+import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
+/**
+ * Tests building indices for colocation track.
+ */
+@Timeout(60)
+public class ItBuildIndexTest extends ItAbstractColocationTest {
+    private static final IndexMessagesFactory FACTORY = new 
IndexMessagesFactory();
+
+    @Test
+    public void testBuildIndex() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+
+        String zoneName = "test-zone";
+        createZone(node, zoneName, 1, 1);
+
+        String tableName = "TEST_TABLE";
+        createCustomTable(node, zoneName, tableName);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        // Test node does not create IndexNodeFinishedRwTransactionsChecker, 
so the following code is needed to unblock index building.
+        // It's easier than creating a real service with all its dependencies.
+        node.clusterService.messagingService().addMessageHandler(
+                IndexMessageGroup.class,
+                (message, sender, correlationId) -> {
+                    if (message instanceof 
IsNodeFinishedRwTransactionsStartedBeforeRequest) {
+                        node.clusterService.messagingService().respond(
+                                sender,
+                                
FACTORY.isNodeFinishedRwTransactionsStartedBeforeResponse().finished(true).build(),
+                                correlationId
+                        );
+                    }
+                });
+
+        TableViewInternal tableViewInternal = node.tableManager.table(tableId);
+        KeyValueView<Tuple, Tuple> tableView = 
tableViewInternal.keyValueView();
+
+        node.transactions().runInTransaction(tx -> {
+            Tuple key = Tuple.create().set("KEY", 1L);
+            Tuple value = Tuple.create().set("VAL", 1).set("DOUBLEVAL", 1.0);
+            tableView.putAll(tx, Map.of(key, value));
+        });
+
+        // This async transaction is needed to update safe time, just because 
the idle safe time propagation is not implemented yet.
+        // https://issues.apache.org/jira/browse/IGNITE-22620
+        CompletableFuture.runAsync(() -> {

Review Comment:
   Why don't you do anything with the return futures?



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBuildIndexTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.index.message.IndexMessageGroup;
+import org.apache.ignite.internal.index.message.IndexMessagesFactory;
+import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
+/**
+ * Tests building indices for colocation track.
+ */
+@Timeout(60)
+public class ItBuildIndexTest extends ItAbstractColocationTest {
+    private static final IndexMessagesFactory FACTORY = new 
IndexMessagesFactory();
+
+    @Test
+    public void testBuildIndex() throws Exception {
+        // Prepare a single node cluster.
+        startCluster(1);
+        Node node = getNode(0);
+
+        String zoneName = "test-zone";
+        createZone(node, zoneName, 1, 1);
+
+        String tableName = "TEST_TABLE";
+        createCustomTable(node, zoneName, tableName);
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName, node.hybridClock.nowLong());
+
+        // Test node does not create IndexNodeFinishedRwTransactionsChecker, 
so the following code is needed to unblock index building.
+        // It's easier than creating a real service with all its dependencies.
+        node.clusterService.messagingService().addMessageHandler(
+                IndexMessageGroup.class,
+                (message, sender, correlationId) -> {
+                    if (message instanceof 
IsNodeFinishedRwTransactionsStartedBeforeRequest) {
+                        node.clusterService.messagingService().respond(
+                                sender,
+                                
FACTORY.isNodeFinishedRwTransactionsStartedBeforeResponse().finished(true).build(),
+                                correlationId
+                        );
+                    }
+                });
+
+        TableViewInternal tableViewInternal = node.tableManager.table(tableId);
+        KeyValueView<Tuple, Tuple> tableView = 
tableViewInternal.keyValueView();
+
+        node.transactions().runInTransaction(tx -> {
+            Tuple key = Tuple.create().set("KEY", 1L);
+            Tuple value = Tuple.create().set("VAL", 1).set("DOUBLEVAL", 1.0);
+            tableView.putAll(tx, Map.of(key, value));
+        });
+
+        // This async transaction is needed to update safe time, just because 
the idle safe time propagation is not implemented yet.
+        // https://issues.apache.org/jira/browse/IGNITE-22620
+        CompletableFuture.runAsync(() -> {
+            try {
+                Thread.sleep(1_000);

Review Comment:
   Why sleep, why a second, suddenly it's not enough?
   Using sleep is bad practice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to