github-actions[bot] commented on code in PR #64167:
URL: https://github.com/apache/doris/pull/64167#discussion_r3371778514


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -3261,6 +3374,35 @@ private boolean createOlapTable(Database db, 
CreateTableInfo createTableInfo) th
                         ColocatePersistInfo info = 
ColocatePersistInfo.createForAddTable(groupId, tableId,
                                 backendsPerBucketSeq);
                         
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
+                    } else {
+                        if 
(Env.getCurrentTenantLevelColocateIndex().isColocateMasterTable(tableId)) {
+                            Map<Tag, GroupV2Id> groups = 
Env.getCurrentTenantLevelColocateIndex()
+                                    .getMasterGroupByTable(tableId);
+                            Map<GroupV2Id, List<List<Long>>> map = new 
HashMap<>();
+                            for (Map.Entry<Tag, GroupV2Id> entry : 
groups.entrySet()) {
+                                GroupV2Id groupId = entry.getValue();
+                                List<List<Long>> backendsPerBucketSeq = 
Env.getCurrentTenantLevelColocateIndex()
+                                        
.getBackendsPerBucketSeqByGroup(groupId);
+                                map.put(groupId, backendsPerBucketSeq);
+                            }
+                            TenantLevelColocateTableInfo info = new 
TenantLevelColocateTableInfo(db.getId(), tableId,
+                                    map);
+                            
Env.getCurrentEnv().getEditLog().logColocateAddTableV2(info);
+                        }

Review Comment:
   Tenant-level colocate membership is journaled only after 
`db.createTableWithoutLock()` has already written `OP_CREATE_TABLE`. If the 
master fails over after the create-table edit log is durable but before this 
colocate log (and the same issue applies to the slave log below), replay only 
runs `replayCreateTableInternal()`, which registers the table/tablets and does 
not rebuild `TenantLevelColocateTableIndex`; the create properties were already 
consumed/removed, and `OlapTable` has no persisted tenant-level colocate field. 
The table then permanently comes back as a non-tenant-colocate table, so 
planning and repair ignore the requested colocation. Please make the 
tenant-level colocate metadata part of the create-table replay atomically, or 
reconstruct it during create-table replay before exposing a successful create.



##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets' 
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends 
ColocateTableCheckerAndBalancer {
+    private static final Logger LOG = 
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+    private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+        super("colocate group clone checker v2", intervalMs);
+    }
+
+    private static volatile TenantLevelColocateTableCheckerAndBalancer 
INSTANCE = null;
+
+    public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+        if (INSTANCE == null) {
+            synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+                if (INSTANCE == null) {
+                    INSTANCE = new 
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    @Override
+    public void runAfterCatalogReady() {
+        relocateAndBalanceGroups();
+        matchGroups();
+    }
+
+    private void relocateAndBalanceGroups() {
+        Set<GroupV2Id> groupIds = 
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+        // balance only inside each group, excluded balance between all groups
+        Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+        if (!Config.disable_colocate_balance_between_groups
+                && !changeGroups.isEmpty()) {
+            // balance both inside each group and between all groups
+            relocateAndBalanceGroup(changeGroups, true);
+        }
+    }
+
+    private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds, 
boolean balanceBetweenGroups) {
+        Set<GroupV2Id> changeGroups = Sets.newHashSet();
+        if (Config.disable_colocate_balance) {
+            return changeGroups;
+        }
+
+        Env env = Env.getCurrentEnv();
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+        GlobalColocateStatistic globalColocateStatistic = 
buildGlobalColocateStatistic();
+
+        // get all groups
+        for (GroupV2Id groupId : groupIds) {
+            Map<Tag, LoadStatisticForTag> statisticMap = 
env.getTabletScheduler().getStatisticMap();
+            if (statisticMap == null) {
+                continue;
+            }
+
+            TenantLevelColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+            ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+            try {
+                
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+            } catch (DdlException e) {
+                colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+                continue;
+            }
+
+            Tag tag = groupSchema.getTag();
+            LoadStatisticForTag statistic = statisticMap.get(tag);
+            if (statistic == null) {
+                continue;
+            }
+            List<List<Long>> backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeqByGroup(groupId);
+            if (backendsPerBucketSeq.isEmpty()) {
+                continue;
+            }
+
+            // get all unavailable backends in the backend bucket sequence of 
this group
+            Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
+                    infoService, colocateIndex, groupId, tag);
+            // get all available backends for this group
+            List<Long> availableBeIds = getAvailableBeIds(tag, 
Collections.emptySet(),
+                    infoService);
+            // try relocate or balance this group for specified tag
+            List<List<Long>> balancedBackendsPerBucketSeq = 
Lists.newArrayList();
+            if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, 
availableBeIds, colocateIndex,
+                    infoService, statistic, globalColocateStatistic, 
balancedBackendsPerBucketSeq,
+                    balanceBetweenGroups)) {
+                if (!colocateIndex.addBackendsPerBucketSeq(groupId, 
balancedBackendsPerBucketSeq, replicaAlloc)) {
+                    LOG.warn("relocate group {} succ, but replica allocation 
has change, old replica alloc {}",
+                            groupId, replicaAlloc);
+                    continue;
+                }
+                colocateIndex.markMasterGroupUnstable(groupId, "relocated", 
true);
+                colocateIndex.markSlaveGroupUnstable(groupId, "master is 
unstable", true);
+                changeGroups.add(groupId);
+                Map<GroupV2Id, List<List<Long>>> 
balancedBackendsPerBucketSeqMap = Maps.newHashMap();
+                balancedBackendsPerBucketSeqMap.put(groupId, 
balancedBackendsPerBucketSeq);
+                ModifyTenantLevelColocateMapInfo info = new 
ModifyTenantLevelColocateMapInfo(
+                        balancedBackendsPerBucketSeqMap);
+                env.getEditLog().logColocateBackendsPerBucketSeqV2(info);
+                LOG.info("balance group {}. now backends per bucket sequence 
for tag {} is: {}",
+                        groupId, tag, balancedBackendsPerBucketSeq);
+            }
+        }
+
+        return changeGroups;
+    }
+
+    /*
+     * Check every tablet of a group, if replica's location does not match 
backends in group, relocating those
+     * replicas, and mark that group as unstable.
+     * If every replicas match the backends in group, mark that group as 
stable.
+     */
+    private void matchGroups() {
+        long start = System.currentTimeMillis();
+        CheckerCounter counter = new CheckerCounter();
+
+        Env env = Env.getCurrentEnv();
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+
+        // check each group
+        Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+        for (GroupV2Id groupId : groupIds) {
+            TenantLevelColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+
+            List<Set<Long>> backendBucketsSeq = 
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+            if (backendBucketsSeq.isEmpty()) {
+                continue;
+            }
+
+            String unstableReason = matchMasterGroup(env, counter, groupId, 
backendBucketsSeq);
+            // mark group as stable or unstable
+            if (Strings.isNullOrEmpty(unstableReason)) {
+                colocateIndex.markMasterGroupStable(groupId, true);
+                unstableReason = matchSlaveGroup(env, counter, groupId, 
backendBucketsSeq);
+                if (Strings.isNullOrEmpty(unstableReason)) {
+                    colocateIndex.markSlaveGroupStable(groupId, true);
+                } else {
+                    colocateIndex.markSlaveGroupUnstable(groupId, 
unstableReason, true);
+                }
+            } else {
+                colocateIndex.markMasterGroupUnstable(groupId, unstableReason, 
true);
+                colocateIndex.markSlaveGroupUnstable(groupId, "mater is 
unstable", true);
+            }
+        } // end for groups
+
+        long cost = System.currentTimeMillis() - start;
+        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+                        + "cost: {} ms",
+                counter.unhealthyTabletNum, counter.totalTabletNum, 
counter.addToSchedulerTabletNum,
+                counter.tabletInScheduler, counter.tabletNotReady, 
counter.tabletExceedLimit, cost);
+    }
+
+    private String matchMasterGroup(Env env, CheckerCounter counter, GroupV2Id 
groupId,
+            List<Set<Long>> backendBucketsSeq) {
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        List<Long> tableIds = colocateIndex.getAllMasterTableIds(groupId);
+        Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+        Reference<String> unstableReason = new Reference<>();
+        for (Long tableId : tableIds) {
+            Database db = 
env.getInternalCatalog().getDbNullableByTable(tableId);
+            if (db == null) {
+                continue;
+            }
+            counter.totalTabletNum++;
+            OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+            if (olapTable == null || 
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+                continue;
+            }
+            try {
+                if (matchTable(env, counter, db, olapTable, colocateTags, 
backendBucketsSeq, unstableReason)) {
+                    break;
+                }
+            } catch (Throwable e) {
+                LOG.warn("something wrong on colocate checker, dbName={}, 
tableName={}, errMsg={}",
+                        db.getFullName(), olapTable.getName(), e.getMessage());
+            }
+        } // end for tables
+        return unstableReason.getRef();
+    }
+
+    private String matchSlaveGroup(Env env, CheckerCounter counter, GroupV2Id 
groupId,
+            List<Set<Long>> masterBackendBucketsSeq) {
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        List<Long> tableIds = colocateIndex.getAllSlaveTableIds(groupId);
+        Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+        Reference<String> unstableReason = new Reference<>();
+        for (Long tableId : tableIds) {
+            Database db = 
env.getInternalCatalog().getDbNullableByTable(tableId);
+            if (db == null) {
+                continue;
+            }
+            counter.totalTabletNum++;
+            OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+            if (olapTable == null || 
!colocateIndex.isColocateSlaveTable(olapTable.getId())) {
+                continue;
+            }
+            List<Set<Long>> backendBucketsSeq = 
TenantLevelColocateTableIndex.getSlaveBackendsPerBucketSeqSet(
+                    masterBackendBucketsSeq, 
olapTable.getDefaultDistributionInfo().getBucketNum());

Review Comment:
   This expands the master bucket map using the table default bucket count for 
every slave partition, but `checkSlaveDistribution()` explicitly allows a slave 
partition bucket count to be any multiple of the master bucket count. A valid 
slave table can therefore have a later partition with, for example, 8 buckets 
while the default is 4; `matchPartition()` then hits the 
`backendBucketsSeq.size() == index.getTablets().size()` precondition, the catch 
only logs, `unstableReason` remains null, and the slave group can be marked 
stable without checking or repairing that partition. Expand using each 
partition/index tablet count (or disallow these per-partition bucket multiples 
consistently).



##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -2263,23 +2282,32 @@ private void computeScanRangeAssignmentByColocate(
             fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), 
new BucketSeqToScanRange());
 
             // Same as bucket shuffle.
-            int bucketNum = scanNode.getBucketNum();
-            scanNode.getFragment().setBucketNum(bucketNum);
+            PlanFragment fragment = scanNode.getFragment();
+            if (!fragment.getColocateData().isEmpty()) {
+                fragment.setBucketNum(fragment.getColocateData().size());
+            } else {

Review Comment:
   `getColocateData()` is a `Map<Tag, List<List<Long>>>`, so `size()` is the 
number of tags, not the number of colocate buckets. For the common case with 
one location tag and 32 buckets this sets `fragmentBucketNum` to 1, and the new 
`bucketSeq % fragmentBucketNum` below collapses every tablet bucket into 
colocate bucket 0 and one selected execution host. This breaks colocate 
parallelism and can overload a single BE. Use the bucket-list size, e.g. 
`fragment.getColocateData().values().iterator().next().size()` after the 
planner has normalized all retained tags to the same size.



##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets' 
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends 
ColocateTableCheckerAndBalancer {
+    private static final Logger LOG = 
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+    private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+        super("colocate group clone checker v2", intervalMs);
+    }
+
+    private static volatile TenantLevelColocateTableCheckerAndBalancer 
INSTANCE = null;
+
+    public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+        if (INSTANCE == null) {
+            synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+                if (INSTANCE == null) {
+                    INSTANCE = new 
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    @Override
+    public void runAfterCatalogReady() {
+        relocateAndBalanceGroups();
+        matchGroups();
+    }
+
+    private void relocateAndBalanceGroups() {
+        Set<GroupV2Id> groupIds = 
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+        // balance only inside each group, excluded balance between all groups
+        Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+        if (!Config.disable_colocate_balance_between_groups
+                && !changeGroups.isEmpty()) {
+            // balance both inside each group and between all groups
+            relocateAndBalanceGroup(changeGroups, true);
+        }
+    }
+
+    private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds, 
boolean balanceBetweenGroups) {
+        Set<GroupV2Id> changeGroups = Sets.newHashSet();
+        if (Config.disable_colocate_balance) {
+            return changeGroups;
+        }
+
+        Env env = Env.getCurrentEnv();
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+        GlobalColocateStatistic globalColocateStatistic = 
buildGlobalColocateStatistic();
+
+        // get all groups
+        for (GroupV2Id groupId : groupIds) {
+            Map<Tag, LoadStatisticForTag> statisticMap = 
env.getTabletScheduler().getStatisticMap();
+            if (statisticMap == null) {
+                continue;
+            }
+
+            TenantLevelColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+            ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+            try {
+                
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+            } catch (DdlException e) {
+                colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+                continue;
+            }
+
+            Tag tag = groupSchema.getTag();
+            LoadStatisticForTag statistic = statisticMap.get(tag);
+            if (statistic == null) {
+                continue;
+            }
+            List<List<Long>> backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeqByGroup(groupId);
+            if (backendsPerBucketSeq.isEmpty()) {
+                continue;
+            }
+
+            // get all unavailable backends in the backend bucket sequence of 
this group
+            Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
+                    infoService, colocateIndex, groupId, tag);
+            // get all available backends for this group
+            List<Long> availableBeIds = getAvailableBeIds(tag, 
Collections.emptySet(),
+                    infoService);
+            // try relocate or balance this group for specified tag
+            List<List<Long>> balancedBackendsPerBucketSeq = 
Lists.newArrayList();
+            if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, 
availableBeIds, colocateIndex,
+                    infoService, statistic, globalColocateStatistic, 
balancedBackendsPerBucketSeq,
+                    balanceBetweenGroups)) {
+                if (!colocateIndex.addBackendsPerBucketSeq(groupId, 
balancedBackendsPerBucketSeq, replicaAlloc)) {
+                    LOG.warn("relocate group {} succ, but replica allocation 
has change, old replica alloc {}",
+                            groupId, replicaAlloc);
+                    continue;
+                }
+                colocateIndex.markMasterGroupUnstable(groupId, "relocated", 
true);
+                colocateIndex.markSlaveGroupUnstable(groupId, "master is 
unstable", true);
+                changeGroups.add(groupId);
+                Map<GroupV2Id, List<List<Long>>> 
balancedBackendsPerBucketSeqMap = Maps.newHashMap();
+                balancedBackendsPerBucketSeqMap.put(groupId, 
balancedBackendsPerBucketSeq);
+                ModifyTenantLevelColocateMapInfo info = new 
ModifyTenantLevelColocateMapInfo(
+                        balancedBackendsPerBucketSeqMap);
+                env.getEditLog().logColocateBackendsPerBucketSeqV2(info);
+                LOG.info("balance group {}. now backends per bucket sequence 
for tag {} is: {}",
+                        groupId, tag, balancedBackendsPerBucketSeq);
+            }
+        }
+
+        return changeGroups;
+    }
+
+    /*
+     * Check every tablet of a group, if replica's location does not match 
backends in group, relocating those
+     * replicas, and mark that group as unstable.
+     * If every replicas match the backends in group, mark that group as 
stable.
+     */
+    private void matchGroups() {
+        long start = System.currentTimeMillis();
+        CheckerCounter counter = new CheckerCounter();
+
+        Env env = Env.getCurrentEnv();
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+
+        // check each group
+        Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+        for (GroupV2Id groupId : groupIds) {
+            TenantLevelColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+
+            List<Set<Long>> backendBucketsSeq = 
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+            if (backendBucketsSeq.isEmpty()) {
+                continue;
+            }
+
+            String unstableReason = matchMasterGroup(env, counter, groupId, 
backendBucketsSeq);
+            // mark group as stable or unstable
+            if (Strings.isNullOrEmpty(unstableReason)) {
+                colocateIndex.markMasterGroupStable(groupId, true);
+                unstableReason = matchSlaveGroup(env, counter, groupId, 
backendBucketsSeq);
+                if (Strings.isNullOrEmpty(unstableReason)) {
+                    colocateIndex.markSlaveGroupStable(groupId, true);
+                } else {
+                    colocateIndex.markSlaveGroupUnstable(groupId, 
unstableReason, true);
+                }
+            } else {
+                colocateIndex.markMasterGroupUnstable(groupId, unstableReason, 
true);
+                colocateIndex.markSlaveGroupUnstable(groupId, "mater is 
unstable", true);
+            }
+        } // end for groups
+
+        long cost = System.currentTimeMillis() - start;
+        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+                        + "cost: {} ms",
+                counter.unhealthyTabletNum, counter.totalTabletNum, 
counter.addToSchedulerTabletNum,
+                counter.tabletInScheduler, counter.tabletNotReady, 
counter.tabletExceedLimit, cost);
+    }
+
+    private String matchMasterGroup(Env env, CheckerCounter counter, GroupV2Id 
groupId,
+            List<Set<Long>> backendBucketsSeq) {
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        List<Long> tableIds = colocateIndex.getAllMasterTableIds(groupId);
+        Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+        Reference<String> unstableReason = new Reference<>();
+        for (Long tableId : tableIds) {
+            Database db = 
env.getInternalCatalog().getDbNullableByTable(tableId);
+            if (db == null) {
+                continue;
+            }
+            counter.totalTabletNum++;
+            OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+            if (olapTable == null || 
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+                continue;
+            }
+            try {
+                if (matchTable(env, counter, db, olapTable, colocateTags, 
backendBucketsSeq, unstableReason)) {
+                    break;
+                }
+            } catch (Throwable e) {
+                LOG.warn("something wrong on colocate checker, dbName={}, 
tableName={}, errMsg={}",
+                        db.getFullName(), olapTable.getName(), e.getMessage());
+            }
+        } // end for tables
+        return unstableReason.getRef();
+    }
+
+    private String matchSlaveGroup(Env env, CheckerCounter counter, GroupV2Id 
groupId,
+            List<Set<Long>> masterBackendBucketsSeq) {
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        List<Long> tableIds = colocateIndex.getAllSlaveTableIds(groupId);
+        Set<Tag> colocateTags = Collections.singleton(groupId.getTag());
+        Reference<String> unstableReason = new Reference<>();
+        for (Long tableId : tableIds) {
+            Database db = 
env.getInternalCatalog().getDbNullableByTable(tableId);
+            if (db == null) {
+                continue;
+            }
+            counter.totalTabletNum++;
+            OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+            if (olapTable == null || 
!colocateIndex.isColocateSlaveTable(olapTable.getId())) {
+                continue;
+            }
+            List<Set<Long>> backendBucketsSeq = 
TenantLevelColocateTableIndex.getSlaveBackendsPerBucketSeqSet(
+                    masterBackendBucketsSeq, 
olapTable.getDefaultDistributionInfo().getBucketNum());
+            if (backendBucketsSeq.isEmpty()) {
+                continue;
+            }
+            try {
+                if (matchTable(env, counter, db, olapTable, colocateTags, 
backendBucketsSeq, unstableReason)) {
+                    break;
+                }
+            } catch (Throwable e) {
+                LOG.warn("something wrong on colocate checker, dbName={}, 
tableName={}, errMsg={}",
+                        db.getFullName(), olapTable.getName(), e.getMessage());
+            }
+        } // end for tables
+        return unstableReason.getRef();
+    }
+
+    private boolean matchTable(Env env, CheckerCounter counter, Database db,
+            OlapTable olapTable, Set<Tag> colocateTags,
+            List<Set<Long>> backendBucketsSeq, Reference<String> 
unstableReason) {
+        olapTable.readLock();
+        try {
+            for (Partition partition : olapTable.getPartitions()) {
+                if (matchPartition(env, counter, db, olapTable, partition, 
colocateTags,
+                        backendBucketsSeq, unstableReason)) {
+                    return true;
+                }
+            }
+        } finally {
+            olapTable.readUnlock();
+        }
+        return false;
+    }
+
+    private boolean matchPartition(Env env, CheckerCounter counter, Database 
db,
+            OlapTable olapTable, Partition partition, Set<Tag> colocateTags,
+            List<Set<Long>> backendBucketsSeq, Reference<String> 
unstableReason) {
+        TabletScheduler tabletScheduler = env.getTabletScheduler();
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+        ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo()
+                
.getReplicaAllocation(partition.getId()).getSubMap(colocateTags);
+
+        short replicationNum = replicaAlloc.getTotalReplicaNum();
+        long visibleVersion = partition.getVisibleVersion();
+        // Here we only get VISIBLE indexes. All other indexes are not 
queryable.
+        // So it does not matter if tablets of other indexes are not matched.
+        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+            Preconditions.checkState(backendBucketsSeq.size() == 
index.getTablets().size(),
+                    backendBucketsSeq.size() + " vs. " + 
index.getTablets().size());
+            List<Long> tabletIdsInOrder = index.getTabletIdsInOrder();
+            for (int idx = 0; idx < tabletIdsInOrder.size(); idx++) {
+                Long tabletId = tabletIdsInOrder.get(idx);
+                counter.totalTabletNum++;
+                Set<Long> bucketsSeq = backendBucketsSeq.get(idx);
+                Preconditions.checkState(bucketsSeq.size() == replicationNum,
+                        bucketsSeq.size() + " vs. " + replicationNum);
+                Tablet tablet = index.getTablet(tabletId);
+                TabletHealth tabletHealth = 
tablet.getColocateHealthV2(visibleVersion,
+                        replicaAlloc, bucketsSeq, colocateTags);
+                if (tabletHealth.status != TabletStatus.HEALTHY) {
+                    counter.unhealthyTabletNum++;
+                    unstableReason.setRef(String.format("get unhealthy tablet 
%d in colocate table."
+                            + " status: %s", tablet.getId(), 
tabletHealth.status));
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(unstableReason);
+                    }
+
+                    if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
+                        continue;
+                    }
+
+                    if (!tablet.readyToBeRepaired(infoService, 
Priority.NORMAL)) {
+                        counter.tabletNotReady++;
+                        continue;
+                    }
+
+                    TabletSchedCtx tabletCtx = new TabletSchedCtx(
+                            TabletSchedCtx.Type.REPAIR,
+                            db.getId(), olapTable.getId(), partition.getId(), 
index.getId(), tablet.getId(),
+                            
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()),
+                            System.currentTimeMillis());
+                    // the tablet status will be set again when being scheduled
+                    tabletCtx.setTabletHealth(tabletHealth);
+                    tabletCtx.setTabletOrderIdx(idx);
+                    
tabletCtx.setIsUniqKeyMergeOnWrite(olapTable.isUniqKeyMergeOnWrite());
+
+                    AddResult res = tabletScheduler.addTablet(tabletCtx, false 
/* not force */);
+                    if (res == AddResult.DISABLED) {
+                        // tablet in scheduler exceed limit, or scheduler is 
disabled,
+                        // skip this group and check next one.
+                        LOG.info("tablet scheduler return: {}. stop colocate 
table check", res.name());
+                        return true;
+                    } else if (res == AddResult.ADDED) {
+                        counter.addToSchedulerTabletNum++;
+                    } else if (res == AddResult.ALREADY_IN) {
+                        counter.tabletInScheduler++;
+                    } else if (res == AddResult.REPLACE_ADDED || res == 
AddResult.LIMIT_EXCEED) {
+                        counter.tabletExceedLimit++;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    private GlobalColocateStatistic buildGlobalColocateStatistic() {
+        Env env = Env.getCurrentEnv();
+        TenantLevelColocateTableIndex colocateIndex = 
env.getTenantLevelColocateTableIndex();
+        GlobalColocateStatistic globalColocateStatistic = new 
GlobalColocateStatistic();
+
+        Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+        for (GroupV2Id groupId : groupIds) {
+            TenantLevelColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                LOG.info("Not found colocate group {}, maybe delete", groupId);
+                continue;
+            }
+            ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+            List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
+            List<Set<Long>> backendBucketsSeq = 
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+            if (backendBucketsSeq.isEmpty()) {
+                continue;
+            }
+
+            int totalReplicaNumPerBucket = 0;
+            ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
+            for (int i = 0; i < backendBucketsSeq.size(); i++) {
+                totalReplicaDataSizes.add(0L);
+            }
+
+            for (Long tableId : tableIds) {
+                Database db = 
env.getInternalCatalog().getDbNullableByTable(tableId);
+                if (db == null) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+                if (olapTable == null || 
!colocateIndex.isColocateMasterTable(olapTable.getId())) {
+                    continue;

Review Comment:
   `getAllTableIds(groupId)` includes both master and slave tables, but this 
condition skips every slave table when building the balance statistics. Slave 
tables use the same bucket sequence and can be much larger than the master, so 
relocation/balance decisions are made from only master-table sizes and can move 
buckets as if the colocated slave data did not exist. Please include slave 
table tablet sizes as well, mapping their bucket index modulo the master bucket 
sequence as done for slave matching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to