rpuch commented on code in PR #5276:
URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1967704226


##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +224,116 @@ private CompletableFuture<?> 
onIndexRemoved(RemoveIndexEventParameters parameter
 
     private CompletableFuture<?> 
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                primaryReplicaIds.add(primaryReplicaId);
+                // This assert is added for testing purposes, at least for now.
+                assert (enabledColocation() && parameters.groupId() instanceof 
ZonePartitionId)
+                        || (!enabledColocation() && parameters.groupId() 
instanceof TablePartitionId) :
+                        "Primary replica ID must be of type ZonePartitionId if 
colocation is enabled, "
+                                + "otherwise it must be of type 
TablePartitionId";
+
+                primaryReplicaIds.add(parameters.groupId());
 
                 // It is safe to get the latest version of the catalog because 
the PRIMARY_REPLICA_ELECTED event is handled on the
                 // metastore thread.
                 Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-                // TODO: IGNITE-22656 It is necessary not to generate an event 
for a destroyed table by LWM
-                if (catalog == null || 
catalog.table(primaryReplicaId.tableId()) == null) {
-                    return nullCompletedFuture();
-                }
+                if (enabledColocation()) {
+                    ZonePartitionId primaryReplicaId = (ZonePartitionId) 
parameters.groupId();
+
+                    CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(primaryReplicaId.zoneId());
+                    // TODO: IGNITE-22656 It is necessary not to generate an 
event for a destroyed zone by LWM
+                    if (zoneDescriptor == null) {
+                        return nullCompletedFuture();
+                    }
 
-                return getMvTableStorageFuture(parameters.causalityToken(), 
primaryReplicaId)
-                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
-                                .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
-                                        catalog.version(),
-                                        primaryReplicaId,
-                                        mvTableStorage,
-                                        replicaMeta
-                                ))
-                        );
+                    var indexFutures = new ArrayList<CompletableFuture<?>>();
+                    for (CatalogTableDescriptor tableDescriptor : 
catalog.tables()) {
+                        // 1. Perhaps, it makes sense to get primary replica 
future first and then get table storage future,
+                        // because, it will be the same for all tables in the 
zone for the given partition.
+                        // 2. Is it possible to filter out tables in efficient 
way
+                        // that do not have indices to avoid creating 
unnecessary futures?
+                        // It looks like 
catalog.indexes(tableDescriptor.id()).isEmpty() is good enough.
+                        // However, what about PK index?

Review Comment:
   Not just PK index, probably. Batched table+index creation (designed, I'm not 
sure it's implemented yet) will not need the index to be built either



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -272,16 +380,25 @@ private void tryScheduleBuildIndex(
      *
      * @param replicaId Replica ID.
      */
-    private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId 
replicaId) {
+    private void stopBuildingIndexesIfPrimaryExpired(ReplicationGroupId 
replicaId) {

Review Comment:
   `PartitionGroupId` can be used here as well



##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java:
##########
@@ -65,18 +68,25 @@
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /** For {@link IndexBuildController} testing. */
+@ExtendWith(WorkDirectoryExtension.class)
+@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")

Review Comment:
   So it's not tested for the table case (no colocation) anymore? But this is 
still the production mode



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +224,116 @@ private CompletableFuture<?> 
onIndexRemoved(RemoveIndexEventParameters parameter
 
     private CompletableFuture<?> 
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                primaryReplicaIds.add(primaryReplicaId);
+                // This assert is added for testing purposes, at least for now.
+                assert (enabledColocation() && parameters.groupId() instanceof 
ZonePartitionId)
+                        || (!enabledColocation() && parameters.groupId() 
instanceof TablePartitionId) :
+                        "Primary replica ID must be of type ZonePartitionId if 
colocation is enabled, "
+                                + "otherwise it must be of type 
TablePartitionId";
+
+                primaryReplicaIds.add(parameters.groupId());
 
                 // It is safe to get the latest version of the catalog because 
the PRIMARY_REPLICA_ELECTED event is handled on the
                 // metastore thread.
                 Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-                // TODO: IGNITE-22656 It is necessary not to generate an event 
for a destroyed table by LWM
-                if (catalog == null || 
catalog.table(primaryReplicaId.tableId()) == null) {
-                    return nullCompletedFuture();
-                }
+                if (enabledColocation()) {
+                    ZonePartitionId primaryReplicaId = (ZonePartitionId) 
parameters.groupId();
+
+                    CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(primaryReplicaId.zoneId());
+                    // TODO: IGNITE-22656 It is necessary not to generate an 
event for a destroyed zone by LWM
+                    if (zoneDescriptor == null) {
+                        return nullCompletedFuture();
+                    }
 
-                return getMvTableStorageFuture(parameters.causalityToken(), 
primaryReplicaId)
-                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
-                                .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
-                                        catalog.version(),
-                                        primaryReplicaId,
-                                        mvTableStorage,
-                                        replicaMeta
-                                ))
-                        );
+                    var indexFutures = new ArrayList<CompletableFuture<?>>();
+                    for (CatalogTableDescriptor tableDescriptor : 
catalog.tables()) {
+                        // 1. Perhaps, it makes sense to get primary replica 
future first and then get table storage future,
+                        // because, it will be the same for all tables in the 
zone for the given partition.
+                        // 2. Is it possible to filter out tables in efficient 
way
+                        // that do not have indices to avoid creating 
unnecessary futures?
+                        // It looks like 
catalog.indexes(tableDescriptor.id()).isEmpty() is good enough.
+                        // However, what about PK index?
+                        CompletableFuture<?> future =
+                                
getMvTableStorageFuture(parameters.causalityToken(), tableDescriptor.id())
+                                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
+                                                .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
+                                                        catalog.version(),
+                                                        tableDescriptor,
+                                                        primaryReplicaId,
+                                                        mvTableStorage,
+                                                        replicaMeta)));
+
+                        indexFutures.add(future);
+                    }
+
+                    return 
allOf(indexFutures.toArray(CompletableFuture[]::new));
+                } else {
+                    TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
+
+                    // TODO: IGNITE-22656 It is necessary not to generate an 
event for a destroyed table by LWM
+                    CatalogTableDescriptor tableDescriptor = 
catalog.table(primaryReplicaId.tableId());
+                    if (tableDescriptor == null) {
+                        return nullCompletedFuture();
+                    }
+
+                    return 
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId.tableId())
+                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
+                                    .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
+                                            catalog.version(),
+                                            tableDescriptor,
+                                            primaryReplicaId,
+                                            mvTableStorage,
+                                            replicaMeta
+                                    ))
+                            );
+                }
             } else {
-                stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
+                stopBuildingIndexesIfPrimaryExpired(parameters.groupId());
 
                 return nullCompletedFuture();
             }
         });
     }
 
     private void tryScheduleBuildIndexesForNewPrimaryReplica(
+            // TODO It seems that we can pass the catalog itself instead of 
its version.
             int catalogVersion,
-            TablePartitionId primaryReplicaId,
+            CatalogTableDescriptor tableDescriptor,
+            ReplicationGroupId primaryReplicaId,
             MvTableStorage mvTableStorage,
             ReplicaMeta replicaMeta
     ) {
         inBusyLock(busyLock, () -> {
-            if (isLeaseExpire(replicaMeta)) {
+            if (isLeaseExpired(replicaMeta)) {
                 stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
 
                 return;
             }
 
             Catalog catalog = catalogService.catalog(catalogVersion);
 
+            // This assert does not make sense, {@link CatalogService#catalog} 
throws CatalogNotFoundException if the catalog is not found.
             assert catalog != null : "Not found catalog for version " + 
catalogVersion;

Review Comment:
   Let's remove it then



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +224,116 @@ private CompletableFuture<?> 
onIndexRemoved(RemoveIndexEventParameters parameter
 
     private CompletableFuture<?> 
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                primaryReplicaIds.add(primaryReplicaId);
+                // This assert is added for testing purposes, at least for now.
+                assert (enabledColocation() && parameters.groupId() instanceof 
ZonePartitionId)
+                        || (!enabledColocation() && parameters.groupId() 
instanceof TablePartitionId) :
+                        "Primary replica ID must be of type ZonePartitionId if 
colocation is enabled, "
+                                + "otherwise it must be of type 
TablePartitionId";
+
+                primaryReplicaIds.add(parameters.groupId());
 
                 // It is safe to get the latest version of the catalog because 
the PRIMARY_REPLICA_ELECTED event is handled on the
                 // metastore thread.
                 Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-                // TODO: IGNITE-22656 It is necessary not to generate an event 
for a destroyed table by LWM
-                if (catalog == null || 
catalog.table(primaryReplicaId.tableId()) == null) {
-                    return nullCompletedFuture();
-                }
+                if (enabledColocation()) {
+                    ZonePartitionId primaryReplicaId = (ZonePartitionId) 
parameters.groupId();
+
+                    CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(primaryReplicaId.zoneId());
+                    // TODO: IGNITE-22656 It is necessary not to generate an 
event for a destroyed zone by LWM
+                    if (zoneDescriptor == null) {
+                        return nullCompletedFuture();
+                    }
 
-                return getMvTableStorageFuture(parameters.causalityToken(), 
primaryReplicaId)
-                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
-                                .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
-                                        catalog.version(),
-                                        primaryReplicaId,
-                                        mvTableStorage,
-                                        replicaMeta
-                                ))
-                        );
+                    var indexFutures = new ArrayList<CompletableFuture<?>>();
+                    for (CatalogTableDescriptor tableDescriptor : 
catalog.tables()) {
+                        // 1. Perhaps, it makes sense to get primary replica 
future first and then get table storage future,
+                        // because, it will be the same for all tables in the 
zone for the given partition.
+                        // 2. Is it possible to filter out tables in efficient 
way
+                        // that do not have indices to avoid creating 
unnecessary futures?
+                        // It looks like 
catalog.indexes(tableDescriptor.id()).isEmpty() is good enough.
+                        // However, what about PK index?
+                        CompletableFuture<?> future =
+                                
getMvTableStorageFuture(parameters.causalityToken(), tableDescriptor.id())
+                                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
+                                                .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
+                                                        catalog.version(),
+                                                        tableDescriptor,
+                                                        primaryReplicaId,
+                                                        mvTableStorage,
+                                                        replicaMeta)));
+
+                        indexFutures.add(future);
+                    }
+
+                    return 
allOf(indexFutures.toArray(CompletableFuture[]::new));
+                } else {
+                    TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
+
+                    // TODO: IGNITE-22656 It is necessary not to generate an 
event for a destroyed table by LWM
+                    CatalogTableDescriptor tableDescriptor = 
catalog.table(primaryReplicaId.tableId());
+                    if (tableDescriptor == null) {
+                        return nullCompletedFuture();
+                    }
+
+                    return 
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId.tableId())
+                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
+                                    .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
+                                            catalog.version(),
+                                            tableDescriptor,
+                                            primaryReplicaId,
+                                            mvTableStorage,
+                                            replicaMeta
+                                    ))
+                            );
+                }
             } else {
-                stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
+                stopBuildingIndexesIfPrimaryExpired(parameters.groupId());
 
                 return nullCompletedFuture();
             }
         });
     }
 
     private void tryScheduleBuildIndexesForNewPrimaryReplica(
+            // TODO It seems that we can pass the catalog itself instead of 
its version.
             int catalogVersion,
-            TablePartitionId primaryReplicaId,
+            CatalogTableDescriptor tableDescriptor,
+            ReplicationGroupId primaryReplicaId,
             MvTableStorage mvTableStorage,
             ReplicaMeta replicaMeta
     ) {
         inBusyLock(busyLock, () -> {
-            if (isLeaseExpire(replicaMeta)) {
+            if (isLeaseExpired(replicaMeta)) {
                 stopBuildingIndexesIfPrimaryExpired(primaryReplicaId);
 
                 return;
             }
 
             Catalog catalog = catalogService.catalog(catalogVersion);
 
+            // This assert does not make sense, {@link CatalogService#catalog} 
throws CatalogNotFoundException if the catalog is not found.
             assert catalog != null : "Not found catalog for version " + 
catalogVersion;
 
-            for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(primaryReplicaId.tableId())) {
+            for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableDescriptor.id())) {
                 if (indexDescriptor.status() == BUILDING) {
-                    scheduleBuildIndex(primaryReplicaId, indexDescriptor, 
mvTableStorage, enlistmentConsistencyToken(replicaMeta));
+                    scheduleBuildIndex(
+                            tableDescriptor.zoneId(),
+                            tableDescriptor.id(),
+                            enabledColocation()
+                                    ? ((ZonePartitionId) 
primaryReplicaId).partitionId()

Review Comment:
   In a few places in the code I see this pattern. It's easier to just cast to 
`PartitionGroupId` and take `partitionId()` from it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to