This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 207aba4d0b4 Refactor active queries (#31742) 207aba4d0b4 is described below commit 207aba4d0b434cf7394d825ba7e1fbc96242723d Author: wangbo <wan...@apache.org> AuthorDate: Tue Mar 5 13:51:36 2024 +0800 Refactor active queries (#31742) --- .../table-functions/active_queries.md | 45 ++++++++---------- .../table-functions/active_queries.md | 45 ++++++++---------- .../commands/insert/AbstractInsertExecutor.java | 4 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 ++ .../java/org/apache/doris/qe/StmtExecutor.java | 5 +- .../ActiveQueriesTableValuedFunction.java | 9 ---- .../doris/tablefunction/MetadataGenerator.java | 54 +++++++++++----------- 7 files changed, 74 insertions(+), 92 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md index 35a71b5eb60..cbc0e20845d 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -45,37 +45,30 @@ This function is used in FROM clauses. active_queries() table schema: ``` -mysql [(none)]> desc function active_queries(); -+------------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------------+--------+------+-------+---------+-------+ -| BeHost | TEXT | No | false | NULL | NONE | -| BePort | BIGINT | No | false | NULL | NONE | -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | TEXT | No | false | NULL | NONE | -| QueryTimeMs | BIGINT | No | false | NULL | NONE | -| WorkloadGroupId | BIGINT | No | false | NULL | NONE | -| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | -| ScanRows | BIGINT | No | false | NULL | NONE | -| ScanBytes | BIGINT | No | false | NULL | NONE | -| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | -| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -+------------------------+--------+------+-------+---------+-------+ -14 rows in set (0.00 sec) +mysql [(none)]>desc function active_queries(); ++------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------+--------+------+-------+---------+-------+ +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------+--------+------+-------+---------+-------+ +7 rows in set (0.00 sec) ``` ### example ``` mysql [(none)]>select * from active_queries(); -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -1 row in set (0.01 sec) ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +1 row in set (0.03 sec) ``` ### keywords diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md index bdae08285f2..feda3c128ca 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -45,37 +45,30 @@ active_queries active_queries()表结构: ``` -mysql [(none)]> desc function active_queries(); -+------------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------------+--------+------+-------+---------+-------+ -| BeHost | TEXT | No | false | NULL | NONE | -| BePort | BIGINT | No | false | NULL | NONE | -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | TEXT | No | false | NULL | NONE | -| QueryTimeMs | BIGINT | No | false | NULL | NONE | -| WorkloadGroupId | BIGINT | No | false | NULL | NONE | -| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | -| ScanRows | BIGINT | No | false | NULL | NONE | -| ScanBytes | BIGINT | No | false | NULL | NONE | -| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | -| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -+------------------------+--------+------+-------+---------+-------+ -14 rows in set (0.00 sec) +mysql [(none)]>desc function active_queries(); ++------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------+--------+------+-------+---------+-------+ +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------+--------+------+-------+---------+-------+ +7 rows in set (0.00 sec) ``` ### example ``` mysql [(none)]>select * from active_queries(); -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -1 row in set (0.01 sec) ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +1 row in set (0.03 sec) ``` ### keywords diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 34c7ad5f596..2af6212808d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryType; @@ -110,7 +111,8 @@ public abstract class AbstractInsertExecutor { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile()); - QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); + QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); coordinator.exec(); int execTimeout = ctx.getExecTimeout(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8c764767c9c..307025f7c3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -270,6 +270,10 @@ public class Coordinator implements CoordInterface { this.tWorkloadGroups = tWorkloadGroups; } + public List<TPipelineWorkloadGroup> gettWorkloadGroups() { + return tWorkloadGroups; + } + private List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList(); private final ExecutionProfile executionProfile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index dbd4357906f..725113c08f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -150,6 +150,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -2060,8 +2061,8 @@ public class StmtExecutor { coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); profile.setExecutionProfile(coord.getExecutionProfile()); - - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo); Table table = insertStmt.getTargetTable(); if (table instanceof OlapTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index 27f65ed7680..c4bdaaed659 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio public static final String NAME = "active_queries"; private static final ImmutableList<Column> SCHEMA = ImmutableList.of( - new Column("BeHost", ScalarType.createStringType()), - new Column("BePort", PrimitiveType.BIGINT), new Column("QueryId", ScalarType.createStringType()), new Column("StartTime", ScalarType.createStringType()), new Column("QueryTimeMs", PrimitiveType.BIGINT), new Column("WorkloadGroupId", PrimitiveType.BIGINT), - new Column("QueryCpuTimeMs", PrimitiveType.BIGINT), - new Column("ScanRows", PrimitiveType.BIGINT), - new Column("ScanBytes", PrimitiveType.BIGINT), - new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), - new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendRows", PrimitiveType.BIGINT), new Column("Database", ScalarType.createStringType()), new Column("FrontendInstance", ScalarType.createStringType()), new Column("Sql", ScalarType.createStringType())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 8a3df743e24..47de3be4b66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -53,6 +53,7 @@ import org.apache.doris.thrift.TMaterializedViewsMetadataParams; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueriesMetadataParams; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TRow; @@ -78,7 +79,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; public class MetadataGenerator { @@ -473,7 +473,7 @@ public class MetadataGenerator { } private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params, - TFetchSchemaTableDataRequest parentRequest) { + TFetchSchemaTableDataRequest parentRequest) { if (!params.isSetQueriesMetadataParams()) { return errorResult("queries metadata param is not set."); } @@ -487,37 +487,35 @@ public class MetadataGenerator { } selfNode = NetUtils.getHostnameByIp(selfNode); - // get query - Map<Long, Map<String, TQueryStatistics>> beQsMap = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr() - .getBeQueryStatsMap(); - Set<Long> beIdSet = beQsMap.keySet(); - List<TRow> dataBatch = Lists.newArrayList(); Map<String, QueryInfo> queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap(); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - for (Long beId : beIdSet) { - Map<String, TQueryStatistics> qsMap = beQsMap.get(beId); - if (qsMap == null) { - continue; + for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) { + String queryId = entry.getKey(); + QueryInfo queryInfo = entry.getValue(); + + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(queryId)); + + String strDate = sdf.format(new Date(queryInfo.getStartExecTime())); + trow.addToColumnValue(new TCell().setStringVal(strDate)); + trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); + + List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups(); + if (tgroupList != null && tgroupList.size() == 1) { + trow.addToColumnValue(new TCell().setLongVal(tgroupList.get(0).id)); + } else { + trow.addToColumnValue(new TCell().setLongVal(-1)); } - Set<String> queryIdSet = qsMap.keySet(); - for (String queryId : queryIdSet) { - QueryInfo queryInfo = queryInfoMap.get(queryId); - if (queryInfo == null) { - continue; - } - //todo(wb) add connect context for insert select - if (queryInfo.getConnectContext() != null && !Env.getCurrentEnv().getAccessManager() - .checkDbPriv(queryInfo.getConnectContext(), queryInfo.getConnectContext().getDatabase(), - PrivPredicate.SELECT)) { - continue; - } - TQueryStatistics qs = qsMap.get(queryId); - Backend be = Env.getCurrentEnv().getClusterInfo().getBackend(beId); - TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be, selfNode, queryInfo, qs); - dataBatch.add(tRow); + + if (queryInfo.getConnectContext() != null) { + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase())); + } else { + trow.addToColumnValue(new TCell().setStringVal("")); } + trow.addToColumnValue(new TCell().setStringVal(selfNode)); + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); + dataBatch.add(trow); } /* Get the query results from other FE also */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org