This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 54a8c83f6a187d2c2392594fbf62cad292c48966 Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Wed Mar 6 21:57:51 2024 +0800 [enhancement](cloud) add CloudTabletStatMgr to capture stats in cloud mode (#31818) Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> --- .../main/java/org/apache/doris/common/Config.java | 3 + .../apache/doris/catalog/CloudTabletStatMgr.java | 273 +++++++++++++++++++++ .../java/org/apache/doris/catalog/Database.java | 5 + .../main/java/org/apache/doris/catalog/Env.java | 9 +- .../java/org/apache/doris/catalog/Replica.java | 17 ++ 5 files changed, 306 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index cbbf9368720..418122745d4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2583,6 +2583,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int cloud_cold_read_percent = 10; // 10% + @ConfField(mutable = true) + public static int get_tablet_stat_batch_size = 1000; + // The original meta read lock is not enough to keep a snapshot of partition versions, // so the execution of `createScanRangeLocations` are delayed to `Coordinator::exec`, // to help to acquire a snapshot of partition versions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java new file mode 100644 index 00000000000..136c9264a4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest; +import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse; +import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; +import org.apache.doris.cloud.proto.Cloud.TabletIndexPB; +import org.apache.doris.cloud.proto.Cloud.TabletStatsPB; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.rpc.RpcException; + +import lombok.Getter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; + +/* + * CloudTabletStatMgr is for collecting tablet(replica) statistics from backends. + * Each FE will collect by itself. + */ +public class CloudTabletStatMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(CloudTabletStatMgr.class); + + private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + + // <(dbId, tableId) -> CloudTableStats> + private ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> cloudTableStatsMap = new ConcurrentHashMap<>(); + + public CloudTabletStatMgr() { + super("cloud tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); + } + + @Override + protected void runAfterCatalogReady() { + LOG.info("cloud tablet stat begin"); + long start = System.currentTimeMillis(); + + List<GetTabletStatsRequest> reqList = new ArrayList<GetTabletStatsRequest>(); + GetTabletStatsRequest.Builder builder = GetTabletStatsRequest.newBuilder(); + List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + + List<Table> tableList = db.getTables(); + for (Table table : tableList) { + if (table.getType() != TableType.OLAP) { + continue; + } + + table.readLock(); + try { + OlapTable tbl = (OlapTable) table; + for (Partition partition : tbl.getAllPartitions()) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + for (Long tabletId : index.getTabletIdsInOrder()) { + Tablet tablet = index.getTablet(tabletId); + TabletIndexPB.Builder tabletBuilder = TabletIndexPB.newBuilder(); + tabletBuilder.setDbId(dbId); + tabletBuilder.setTableId(table.getId()); + tabletBuilder.setIndexId(index.getId()); + tabletBuilder.setPartitionId(partition.getId()); + tabletBuilder.setTabletId(tablet.getId()); + builder.addTabletIdx(tabletBuilder); + + if (builder.getTabletIdxCount() >= Config.get_tablet_stat_batch_size) { + reqList.add(builder.build()); + builder = GetTabletStatsRequest.newBuilder(); + } + } + } + } // partitions + } finally { + table.readUnlock(); + } + } // tables + } // end for dbs + + if (builder.getTabletIdxCount() > 0) { + reqList.add(builder.build()); + } + + for (GetTabletStatsRequest req : reqList) { + GetTabletStatsResponse resp; + try { + resp = getTabletStats(req); + } catch (RpcException e) { + LOG.info("get tablet stats exception:", e); + continue; + } + + if (resp.getStatus().getCode() != MetaServiceCode.OK) { + continue; + } + + if (LOG.isDebugEnabled()) { + int i = 0; + for (TabletIndexPB idx : req.getTabletIdxList()) { + LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: {} size: {}", + idx.getDbId(), idx.getTableId(), idx.getIndexId(), idx.getTabletId(), + resp.getTabletStats(i++).getDataSize()); + } + } + updateTabletStat(resp); + } + + LOG.info("finished to get tablet stat of all backends. cost: {} ms", + (System.currentTimeMillis() - start)); + + // after update replica in all backends, update index row num + start = System.currentTimeMillis(); + ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> newCloudTableStatsMap = new ConcurrentHashMap<>(); + for (Long dbId : dbIds) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + + List<Table> tableList = db.getTables(); + for (Table table : tableList) { + if (table.getType() != TableType.OLAP) { + continue; + } + OlapTable olapTable = (OlapTable) table; + + String dbName = db.getName(); + Long tableId = table.getId(); + String tableName = table.getName(); + + Long tableDataSize = 0L; + Long tableRowsetCount = 0L; + Long tableSegmentCount = 0L; + Long tableRowCount = 0L; + + if (!table.writeLockIfExist()) { + continue; + } + + try { + for (Partition partition : olapTable.getAllPartitions()) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + long indexRowCount = 0L; + for (Tablet tablet : index.getTablets()) { + long tabletDataSize = 0L; + long tabletRowsetCount = 0L; + long tabletSegmentCount = 0L; + long tabletRowCount = 0L; + + for (Replica replica : tablet.getReplicas()) { + if (replica.getDataSize() > tabletDataSize) { + tabletDataSize = replica.getDataSize(); + } + + if (replica.getRowsetCount() > tabletRowsetCount) { + tabletRowsetCount = replica.getRowsetCount(); + } + + if (replica.getSegmentCount() > tabletSegmentCount) { + tabletSegmentCount = replica.getSegmentCount(); + } + + if (replica.getRowCount() > tabletRowCount) { + tabletRowCount = replica.getRowCount(); + } + } + + tableDataSize += tabletDataSize; + tableRowsetCount += tabletRowsetCount; + tableSegmentCount += tabletSegmentCount; + tableRowCount += tabletRowCount; + + indexRowCount += tabletRowCount; + } // end for tablets + index.setRowCount(indexRowCount); + } // end for indices + } // end for partitions + LOG.debug("finished to set row num for table: {} in database: {}", + table.getName(), db.getFullName()); + } finally { + table.writeUnlock(); + } + + newCloudTableStatsMap.put(Pair.of(dbId, tableId), new CloudTableStats(dbName, tableName, + tableDataSize, tableRowsetCount, tableSegmentCount, tableRowCount)); + } + } + this.cloudTableStatsMap = newCloudTableStatsMap; + LOG.info("finished to update index row num of all databases. cost: {} ms", + (System.currentTimeMillis() - start)); + } + + private void updateTabletStat(GetTabletStatsResponse response) { + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + for (TabletStatsPB stat : response.getTabletStatsList()) { + if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) != null) { + List<Replica> replicas = invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId()); + if (replicas != null && !replicas.isEmpty() && replicas.get(0) != null) { + replicas.get(0).updateCloudStat(stat.getDataSize(), stat.getNumRowsets(), + stat.getNumSegments(), stat.getNumRows()); + } + } + } + } + + private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest request) + throws RpcException { + GetTabletStatsResponse response; + try { + response = MetaServiceProxy.getInstance().getTabletStats(request); + } catch (RpcException e) { + LOG.info("get tablet stat get exception:", e); + throw e; + } + return response; + } + + public ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> getCloudTableStatsMap() { + return this.cloudTableStatsMap; + } + + public static class CloudTableStats { + @Getter + private String dbName; + @Getter + private String tableName; + + @Getter + private Long tableDataSize; + @Getter + private Long tableRowsetCount; + @Getter + private Long tableSegmentCount; + @Getter + private Long tableRowCount; + + public CloudTableStats(String dbName, String tableName, Long tableDataSize, Long tableRowsetCount, + Long tableSegmentCount, Long tableRowCount) { + this.dbName = dbName; + this.tableName = tableName; + this.tableDataSize = tableDataSize; + this.tableRowsetCount = tableRowsetCount; + this.tableSegmentCount = tableSegmentCount; + this.tableRowCount = tableRowCount; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index fd709963a7b..d1f33b2b10c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -213,6 +213,11 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> return fullQualifiedName; } + public String getName() { + String[] strs = fullQualifiedName.split(":"); + return strs.length == 2 ? strs[1] : strs[0]; + } + public void setNameWithLock(String newName) { writeLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index effd7b6ca64..38fe843d5e7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -448,6 +448,8 @@ public class Env { private TabletStatMgr tabletStatMgr; + private CloudTabletStatMgr cloudTabletStatMgr; + private Auth auth; private AccessControllerManager accessManager; @@ -692,6 +694,7 @@ public class Env { this.globalTransactionMgr = new GlobalTransactionMgr(this); this.tabletStatMgr = new TabletStatMgr(); + this.cloudTabletStatMgr = new CloudTabletStatMgr(); this.auth = new Auth(); this.accessManager = new AccessControllerManager(auth); @@ -1670,7 +1673,11 @@ public class Env { private void startNonMasterDaemonThreads() { // start load manager thread loadManager.start(); - tabletStatMgr.start(); + if (Config.isNotCloudMode()) { + tabletStatMgr.start(); + } else { + cloudTabletStatMgr.start(); + } // load and export job label cleaner thread labelCleaner.start(); // es repository diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 0dad3612023..213fc94ec38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -159,6 +159,8 @@ public class Replica implements Writable { */ private long preWatermarkTxnId = -1; private long postWatermarkTxnId = -1; + private long segmentCount = 0L; + private long rowsetCount = 0L; private long userDropTime = -1; @@ -241,6 +243,14 @@ public class Replica implements Writable { return rowCount; } + public long getSegmentCount() { + return segmentCount; + } + + public long getRowsetCount() { + return rowsetCount; + } + public long getLastFailedVersion() { return lastFailedVersion; } @@ -334,6 +344,13 @@ public class Replica implements Writable { this.versionCount = versionCount; } + public synchronized void updateCloudStat(long dataSize, long rowsetNum, long segmentNum, long rowNum) { + this.dataSize = dataSize; + this.rowsetCount = rowsetNum; + this.segmentCount = segmentNum; + this.rowCount = rowNum; + } + public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize, long newRowCount) { updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org