tkalkirill commented on code in PR #5276: URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1971445766
########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +142,73 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() Review Comment: It would be great to bring this into the method, but that's a pipe dream. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java: ########## @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains replica request handlers that is used by + * {@link org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener}. + */ +package org.apache.ignite.internal.table.distributed.replicator.handlers; Review Comment: Wow, how unusual. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java: ########## @@ -23,18 +23,33 @@ * {@link IndexBuildTask} ID. */ class IndexBuildTaskId { + private final int zoneId; + private final int tableId; private final int partitionId; private final int indexId; - IndexBuildTaskId(int tableId, int partitionId, int indexId) { + /** + * Creates a new index building task. + * + * @param zoneId Distribution zone identifier. + * @param tableId Table identifier. Review Comment: ```suggestion * @param tableId Table ID. ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.replicator.handlers; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING; +import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand; +import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; +import org.apache.ignite.internal.table.distributed.index.IndexMeta; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange; +import org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; + +/** + * Handler for {@link BuildIndexReplicaRequest}. + */ +public class BuildIndexReplicaRequestHandler { + /** Factory to create RAFT command messages. */ + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + + private final IndexMetaStorage indexMetaStorage; + + /** Read-write transaction operation tracker for building indexes. */ + private final IndexBuilderTxRwOperationTracker txRwOperationTracker; + + /** Safe time. */ Review Comment: Safe time of/for what? ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java: ########## @@ -23,18 +23,33 @@ * {@link IndexBuildTask} ID. */ class IndexBuildTaskId { + private final int zoneId; + private final int tableId; private final int partitionId; private final int indexId; - IndexBuildTaskId(int tableId, int partitionId, int indexId) { + /** + * Creates a new index building task. + * + * @param zoneId Distribution zone identifier. + * @param tableId Table identifier. + * @param partitionId Partition identifier. Review Comment: ```suggestion * @param partitionId Partition ID. ``` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -184,60 +233,113 @@ private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameter private CompletableFuture<?> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - primaryReplicaIds.add(primaryReplicaId); + // TODO https://issues.apache.org/jira/browse/IGNITE-24375 + // Need to remove TablePartitionId check here and below. + assert parameters.groupId() instanceof ZonePartitionId || parameters.groupId() instanceof TablePartitionId : + "Primary replica ID must be of type ZonePartitionId or TablePartitionId"; + + primaryReplicaIds.add(parameters.groupId()); // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the // metastore thread. Catalog catalog = catalogService.catalog(catalogService.latestCatalogVersion()); - // TODO: IGNITE-22656 It is necessary not to generate an event for a destroyed table by LWM - if (catalog == null || catalog.table(primaryReplicaId.tableId()) == null) { - return nullCompletedFuture(); - } + if (parameters.groupId() instanceof ZonePartitionId) { + assert enabledColocation() : "Primary replica ID must be of type ZonePartitionId"; - return getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId) - .thenCompose(mvTableStorage -> awaitPrimaryReplica(primaryReplicaId, parameters.startTime()) - .thenAccept(replicaMeta -> tryScheduleBuildIndexesForNewPrimaryReplica( - catalog.version(), - primaryReplicaId, - mvTableStorage, - replicaMeta - )) - ); + ZonePartitionId primaryReplicaId = (ZonePartitionId) parameters.groupId(); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(primaryReplicaId.zoneId()); + // TODO: IGNITE-22656 It is necessary not to generate an event for a destroyed zone by LWM + if (zoneDescriptor == null) { + return nullCompletedFuture(); + } + + var indexFutures = new ArrayList<CompletableFuture<?>>(); + for (CatalogTableDescriptor tableDescriptor : catalog.tables()) { + // 1. Perhaps, it makes sense to get primary replica future first and then get table storage future, + // because, it will be the same for all tables in the zone for the given partition. + // 2. Is it possible to filter out tables in efficient way + // that do not have indices to avoid creating unnecessary futures? + // It looks like catalog.indexes(tableDescriptor.id()).isEmpty() is good enough. + // However, what about PK index? + CompletableFuture<?> future = + getMvTableStorageFuture(parameters.causalityToken(), tableDescriptor.id()) + .thenCompose(mvTableStorage -> awaitPrimaryReplica(primaryReplicaId, parameters.startTime()) + .thenAccept(replicaMeta -> tryScheduleBuildIndexesForNewPrimaryReplica( + catalog.version(), + tableDescriptor, + primaryReplicaId, + mvTableStorage, + replicaMeta))); + + indexFutures.add(future); + } + + return allOf(indexFutures.toArray(CompletableFuture[]::new)); Review Comment: U can use `CompletableFutures#allOf` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java: ########## @@ -219,10 +223,13 @@ private List<RowId> createBatchRowIds() { private BuildIndexReplicaRequest createBuildIndexReplicaRequest(List<RowId> rowIds) { boolean finish = rowIds.size() < batchSize; - TablePartitionId tablePartitionId = new TablePartitionId(taskId.getTableId(), taskId.getPartitionId()); + ReplicationGroupIdMessage groupIdMessage = enabledColocation() Review Comment: Maybe a separate method that create `ReplicationGroupIdMessage` ? ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +142,73 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" Review Comment: Same about method. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java: ########## @@ -100,6 +101,7 @@ class IndexBuilder implements ManuallyCloseable { * to the replica. */ public void scheduleBuildIndex( + int zoneId, Review Comment: Probably we should introduce a new class for these parameters, but it is not necessary to do this now. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java: ########## @@ -23,18 +23,33 @@ * {@link IndexBuildTask} ID. */ class IndexBuildTaskId { + private final int zoneId; + private final int tableId; private final int partitionId; private final int indexId; - IndexBuildTaskId(int tableId, int partitionId, int indexId) { + /** + * Creates a new index building task. + * + * @param zoneId Distribution zone identifier. + * @param tableId Table identifier. + * @param partitionId Partition identifier. + * @param indexId Index identifier. Review Comment: ```suggestion * @param indexId Index ID. ``` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -131,35 +142,73 @@ public void close() { } private void addListeners() { - catalogService.listen(CatalogEvent.INDEX_BUILDING, (StartBuildingIndexEventParameters parameters) -> { - return onIndexBuilding(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_BUILDING, + (StartBuildingIndexEventParameters parameters) -> onIndexBuilding(parameters).thenApply(unused -> false)); - catalogService.listen(CatalogEvent.INDEX_REMOVED, (RemoveIndexEventParameters parameters) -> { - return onIndexRemoved(parameters).thenApply(unused -> false); - }); + catalogService.listen(INDEX_REMOVED, + (RemoveIndexEventParameters parameters) -> onIndexRemoved(parameters).thenApply(unused -> false)); - placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, parameters -> { - return onPrimaryReplicaElected(parameters).thenApply(unused -> false); - }); + placementDriver.listen(PRIMARY_REPLICA_ELECTED, parameters -> onPrimaryReplicaElected(parameters).thenApply(unused -> false)); } private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { Catalog catalog = catalogService.catalog(parameters.catalogVersion()); - assert catalog != null : "Not found catalog for version " + parameters.catalogVersion(); + assert catalog != null : "Failed to find a catalog for the specified version [version=" + parameters.catalogVersion() + + ", earliestVersion=" + catalogService.earliestCatalogVersion() + + ", latestVersion=" + catalogService.latestCatalogVersion() + + "]."; CatalogIndexDescriptor indexDescriptor = catalog.index(parameters.indexId()); + assert indexDescriptor != null : "Failed to find an index descriptor for the specified index [indexId=" + + parameters.indexId() + "]."; + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(catalog.table(indexDescriptor.tableId()).zoneId()); + + assert zoneDescriptor != null : "Failed to find a zone descriptor for the specified table [indexId=" Review Comment: Same about method. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java: ########## @@ -184,60 +233,113 @@ private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameter private CompletableFuture<?> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); - if (isLocalNode(clusterService, parameters.leaseholderId())) { - primaryReplicaIds.add(primaryReplicaId); + // TODO https://issues.apache.org/jira/browse/IGNITE-24375 + // Need to remove TablePartitionId check here and below. + assert parameters.groupId() instanceof ZonePartitionId || parameters.groupId() instanceof TablePartitionId : + "Primary replica ID must be of type ZonePartitionId or TablePartitionId"; Review Comment: please add parameters.grouprId() to error message ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskId.java: ########## @@ -23,18 +23,33 @@ * {@link IndexBuildTask} ID. */ class IndexBuildTaskId { + private final int zoneId; + private final int tableId; private final int partitionId; private final int indexId; - IndexBuildTaskId(int tableId, int partitionId, int indexId) { + /** + * Creates a new index building task. + * + * @param zoneId Distribution zone identifier. Review Comment: ```suggestion * @param zoneId Distribution zone ID. ``` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java: ########## @@ -197,13 +201,15 @@ public void scheduleBuildIndexAfterDisasterRecovery( /** * Stops index building if it is in progress. * + * @param zoneId Distribution zone ID. * @param tableId Table ID. * @param partitionId Partition ID. * @param indexId Index ID. */ - public void stopBuildIndex(int tableId, int partitionId, int indexId) { + // TODO remove unused method Review Comment: why? If it is in this ticket, then it should be indicated. ########## modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java: ########## @@ -146,6 +156,7 @@ void testStartBuildIndexesOnIndexCreate() { createIndex(INDEX_NAME); verify(indexBuilder, never()).scheduleBuildIndex( + eq(zoneId(TABLE_NAME)), Review Comment: Will several zones be checked here? If not, then I suggest making the `zoneId()` method similar to `tableId()`. ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java: ########## @@ -117,7 +119,7 @@ public void scheduleBuildIndex( return; } - IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, partitionId, indexId); + IndexBuildTaskId taskId = new IndexBuildTaskId(zoneId, tableId, partitionId, indexId); Review Comment: ```suggestion var taskId = new IndexBuildTaskId(zoneId, tableId, partitionId, indexId); ``` ########## modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java: ########## @@ -174,7 +178,7 @@ public void scheduleBuildIndexAfterDisasterRecovery( return; } - IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, partitionId, indexId); + IndexBuildTaskId taskId = new IndexBuildTaskId(zoneId, tableId, partitionId, indexId); Review Comment: ```suggestion var taskId = new IndexBuildTaskId(zoneId, tableId, partitionId, indexId); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org