This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 28f0b7eb321ec6839808d252bf0d04775df18e1a Author: wangbo <wan...@apache.org> AuthorDate: Wed Mar 6 13:53:57 2024 +0800 [Improvement](profile)Add tvf active_be_tasks() #31815 --- be/src/runtime/runtime_query_statistics_mgr.cpp | 50 ++++++++++++++++++++++ be/src/runtime/runtime_query_statistics_mgr.h | 5 +++ be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +++++++++- be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../doris/catalog/BuiltinTableValuedFunctions.java | 4 +- .../{ActiveQueries.java => ActiveBeTasks.java} | 17 ++++---- .../expressions/functions/table/ActiveQueries.java | 2 +- .../visitor/TableValuedFunctionVisitor.java | 5 +++ ....java => ActiveBeTasksTableValuedFunction.java} | 50 ++++++---------------- .../ActiveQueriesTableValuedFunction.java | 4 +- .../doris/tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/Types.thrift | 3 +- 12 files changed, 114 insertions(+), 52 deletions(-) diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index cb137fbe82a..ee09b0c30dc 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -199,4 +199,54 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64 } } +std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics( + std::vector<std::string> filter_columns) { + std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); + std::vector<TRow> table_rows; + int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + + for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + TRow trow; + + TQueryStatistics tqs; + qs_ctx_ptr->collect_query_statistics(&tqs); + + for (auto iter = filter_columns.begin(); iter != filter_columns.end(); iter++) { + std::string col_name = *iter; + + TCell tcell; + if (col_name == "beid") { + tcell.longVal = be_id; + } else if (col_name == "fehost") { + tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname; + } else if (col_name == "queryid") { + tcell.stringVal = query_id; + } else if (col_name == "tasktimems") { + if (qs_ctx_ptr->_is_query_finished) { + tcell.longVal = qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time; + } else { + tcell.longVal = MonotonicMillis() - qs_ctx_ptr->_query_start_time; + } + } else if (col_name == "taskcputimems") { + tcell.longVal = tqs.cpu_ms; + } else if (col_name == "scanrows") { + tcell.longVal = tqs.scan_rows; + } else if (col_name == "scanbytes") { + tcell.longVal = tqs.scan_bytes; + } else if (col_name == "bepeakmemorybytes") { + tcell.longVal = tqs.max_peak_memory_bytes; + } else if (col_name == "currentusedmemorybytes") { + tcell.longVal = tqs.current_used_memory_bytes; + } else if (col_name == "shufflesendbytes") { + tcell.longVal = tqs.shuffle_send_bytes; + } else if (col_name == "shufflesendRows") { + tcell.longVal = tqs.shuffle_send_rows; + } + trow.column_value.push_back(tcell); + } + table_rows.push_back(trow); + } + return table_rows; +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 69b283b6d14..44badd196a3 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/Data_types.h> + #include <shared_mutex> #include <string> @@ -66,6 +68,9 @@ public: void get_metric_map(std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map); + // used for tvf active_queries + std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string> filter_columns); + private: std::shared_mutex _qs_ctx_map_lock; std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> _query_statistics_ctx_map; diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 22545fa4dce..7d438366cf2 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -34,6 +34,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/thrift_rpc_helper.h" @@ -95,7 +96,12 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable"); } - RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); + if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ACTIVE_BE_TASKS) { + // tvf active_be_tasks fetch data in be directly, it does not need to request FE for data + RETURN_IF_ERROR(_build_active_be_tasks_data()); + } else { + RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); + } return Status::OK(); } @@ -288,6 +294,20 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } +Status VMetaScanner::_build_active_be_tasks_data() { + std::vector<std::string> filter_columns; + for (const auto& slot : _tuple_desc->slots()) { + filter_columns.emplace_back(slot->col_name_lower_case()); + } + + std::vector<TRow> ret = + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics( + filter_columns); + _batch_data = std::move(ret); + + return Status::OK(); +} + Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 59bd55dc2d8..25cb9345311 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -91,6 +91,8 @@ private: TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_active_be_tasks_data(); + bool _meta_eos; TupleId _tuple_id; TUserIdentity _user_identity; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index ac1b31fceac..acdeb683f26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -59,7 +60,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), - tableValued(WorkloadGroups.class, "workload_groups") + tableValued(WorkloadGroups.class, "workload_groups"), + tableValued(ActiveBeTasks.class, "active_be_tasks") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java similarity index 77% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java index f8dcaa4a7ec..5737f52a2b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java @@ -22,18 +22,18 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.Properties; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction; +import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; import java.util.Map; /** - * queries tvf + * stands be running tasks status, currently main including select/streamload/broker load/insert select */ -public class ActiveQueries extends TableValuedFunction { +public class ActiveBeTasks extends TableValuedFunction { - public ActiveQueries(Properties properties) { - super("active_queries", properties); + public ActiveBeTasks(Properties properties) { + super("active_be_tasks", properties); } @Override @@ -45,15 +45,14 @@ public class ActiveQueries extends TableValuedFunction { protected TableValuedFunctionIf toCatalogFunction() { try { Map<String, String> arguments = getTVFProperties().getMap(); - return new ActiveQueriesTableValuedFunction(arguments); + return new ActiveBeTasksTableValuedFunction(arguments); } catch (Throwable t) { - throw new AnalysisException("Can not build FrontendsTableValuedFunction by " + throw new AnalysisException("Can not build ActiveBeTasksTableValuedFunction by " + this + ": " + t.getMessage(), t); } } - @Override public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { - return visitor.visitQueries(this, context); + return visitor.visitActiveBeTasks(this, context); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java index f8dcaa4a7ec..1e15b674249 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java @@ -47,7 +47,7 @@ public class ActiveQueries extends TableValuedFunction { Map<String, String> arguments = getTVFProperties().getMap(); return new ActiveQueriesTableValuedFunction(arguments); } catch (Throwable t) { - throw new AnalysisException("Can not build FrontendsTableValuedFunction by " + throw new AnalysisException("Can not build ActiveQueriesTableValuedFunction by " + this + ": " + t.getMessage(), t); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index fba34d48168..36561e5b12c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.expressions.visitor; +import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -102,4 +103,8 @@ public interface TableValuedFunctionVisitor<R, C> { default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { return visitTableValuedFunction(workloadGroups, context); } + + default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) { + return visitTableValuedFunction(beTasks, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java similarity index 53% copy from fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java copy to fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java index 27f65ed7680..99a8ba4886f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java @@ -23,74 +23,50 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; -import org.apache.doris.thrift.TQueriesMetadataParams; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.Map; -public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunction { - public static final String NAME = "active_queries"; +public class ActiveBeTasksTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "active_be_tasks"; private static final ImmutableList<Column> SCHEMA = ImmutableList.of( - new Column("BeHost", ScalarType.createStringType()), - new Column("BePort", PrimitiveType.BIGINT), + new Column("BeId", PrimitiveType.BIGINT), + new Column("FeHost", ScalarType.createStringType()), new Column("QueryId", ScalarType.createStringType()), - new Column("StartTime", ScalarType.createStringType()), - new Column("QueryTimeMs", PrimitiveType.BIGINT), - new Column("WorkloadGroupId", PrimitiveType.BIGINT), - new Column("QueryCpuTimeMs", PrimitiveType.BIGINT), + new Column("TaskTimeMs", PrimitiveType.BIGINT), + new Column("TaskCpuTimeMs", PrimitiveType.BIGINT), new Column("ScanRows", PrimitiveType.BIGINT), new Column("ScanBytes", PrimitiveType.BIGINT), new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), new Column("ShuffleSendBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendRows", PrimitiveType.BIGINT), - new Column("Database", ScalarType.createStringType()), - new Column("FrontendInstance", ScalarType.createStringType()), - new Column("Sql", ScalarType.createStringType())); - - private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; - - static { - ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder(); - for (int i = 0; i < SCHEMA.size(); i++) { - builder.put(SCHEMA.get(i).getName().toLowerCase(), i); - } - COLUMN_TO_INDEX = builder.build(); - } - - public static Integer getColumnIndexFromColumnName(String columnName) { - return COLUMN_TO_INDEX.get(columnName.toLowerCase()); - } + new Column("ShuffleSendRows", PrimitiveType.BIGINT)); - public ActiveQueriesTableValuedFunction(Map<String, String> params) throws AnalysisException { + public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws AnalysisException { if (params.size() != 0) { - throw new AnalysisException("Queries table-valued-function does not support any params"); + throw new AnalysisException("ActiveBeTasks table-valued-function does not support any params"); } } @Override public TMetadataType getMetadataType() { - return TMetadataType.QUERIES; + return TMetadataType.ACTIVE_BE_TASKS; } @Override public TMetaScanRange getMetaScanRange() { TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.QUERIES); - TQueriesMetadataParams queriesMetadataParams = new TQueriesMetadataParams(); - queriesMetadataParams.setClusterName(""); - queriesMetadataParams.setRelayToOtherFe(true); - metaScanRange.setQueriesParams(queriesMetadataParams); + metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS); return metaScanRange; } @Override public String getTableName() { - return "QueriesTableValuedFunction"; + return "ActiveBeTasksTableValuedFunction"; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index 27f65ed7680..41dd5484dd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -68,7 +68,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio public ActiveQueriesTableValuedFunction(Map<String, String> params) throws AnalysisException { if (params.size() != 0) { - throw new AnalysisException("Queries table-valued-function does not support any params"); + throw new AnalysisException("ActiveQueries table-valued-function does not support any params"); } } @@ -90,7 +90,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio @Override public String getTableName() { - return "QueriesTableValuedFunction"; + return "ActiveQueriesTableValuedFunction"; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index f9fb76a9666..4b755a97bf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -78,6 +78,8 @@ public abstract class TableValuedFunctionIf { return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); + case ActiveBeTasksTableValuedFunction.NAME: + return new ActiveBeTasksTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 11ec1093da0..a1e414384d3 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -711,7 +711,8 @@ enum TMetadataType { JOBS, TASKS, QUERIES, - WORKLOAD_SCHED_POLICY + WORKLOAD_SCHED_POLICY, + ACTIVE_BE_TASKS, } enum TIcebergQueryType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org