This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 28f0b7eb321ec6839808d252bf0d04775df18e1a
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Mar 6 13:53:57 2024 +0800

    [Improvement](profile)Add tvf active_be_tasks() #31815
---
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 50 ++++++++++++++++++++++
 be/src/runtime/runtime_query_statistics_mgr.h      |  5 +++
 be/src/vec/exec/scan/vmeta_scanner.cpp             | 22 +++++++++-
 be/src/vec/exec/scan/vmeta_scanner.h               |  2 +
 .../doris/catalog/BuiltinTableValuedFunctions.java |  4 +-
 .../{ActiveQueries.java => ActiveBeTasks.java}     | 17 ++++----
 .../expressions/functions/table/ActiveQueries.java |  2 +-
 .../visitor/TableValuedFunctionVisitor.java        |  5 +++
 ....java => ActiveBeTasksTableValuedFunction.java} | 50 ++++++----------------
 .../ActiveQueriesTableValuedFunction.java          |  4 +-
 .../doris/tablefunction/TableValuedFunctionIf.java |  2 +
 gensrc/thrift/Types.thrift                         |  3 +-
 12 files changed, 114 insertions(+), 52 deletions(-)

diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index cb137fbe82a..ee09b0c30dc 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -199,4 +199,54 @@ void 
RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64
     }
 }
 
+std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics(
+        std::vector<std::string> filter_columns) {
+    std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+    std::vector<TRow> table_rows;
+    int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+
+    for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+        TRow trow;
+
+        TQueryStatistics tqs;
+        qs_ctx_ptr->collect_query_statistics(&tqs);
+
+        for (auto iter = filter_columns.begin(); iter != filter_columns.end(); 
iter++) {
+            std::string col_name = *iter;
+
+            TCell tcell;
+            if (col_name == "beid") {
+                tcell.longVal = be_id;
+            } else if (col_name == "fehost") {
+                tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname;
+            } else if (col_name == "queryid") {
+                tcell.stringVal = query_id;
+            } else if (col_name == "tasktimems") {
+                if (qs_ctx_ptr->_is_query_finished) {
+                    tcell.longVal = qs_ctx_ptr->_query_finish_time - 
qs_ctx_ptr->_query_start_time;
+                } else {
+                    tcell.longVal = MonotonicMillis() - 
qs_ctx_ptr->_query_start_time;
+                }
+            } else if (col_name == "taskcputimems") {
+                tcell.longVal = tqs.cpu_ms;
+            } else if (col_name == "scanrows") {
+                tcell.longVal = tqs.scan_rows;
+            } else if (col_name == "scanbytes") {
+                tcell.longVal = tqs.scan_bytes;
+            } else if (col_name == "bepeakmemorybytes") {
+                tcell.longVal = tqs.max_peak_memory_bytes;
+            } else if (col_name == "currentusedmemorybytes") {
+                tcell.longVal = tqs.current_used_memory_bytes;
+            } else if (col_name == "shufflesendbytes") {
+                tcell.longVal = tqs.shuffle_send_bytes;
+            } else if (col_name == "shufflesendRows") {
+                tcell.longVal = tqs.shuffle_send_rows;
+            }
+            trow.column_value.push_back(tcell);
+        }
+        table_rows.push_back(trow);
+    }
+    return table_rows;
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 69b283b6d14..44badd196a3 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/Data_types.h>
+
 #include <shared_mutex>
 #include <string>
 
@@ -66,6 +68,9 @@ public:
     void get_metric_map(std::string query_id,
                         std::map<WorkloadMetricType, std::string>& metric_map);
 
+    // used for tvf active_queries
+    std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string> 
filter_columns);
+
 private:
     std::shared_mutex _qs_ctx_map_lock;
     std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> 
_query_statistics_ctx_map;
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 22545fa4dce..7d438366cf2 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -34,6 +34,7 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
+#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/thrift_rpc_helper.h"
@@ -95,7 +96,12 @@ Status VMetaScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conju
         return Status::InternalError("Logical error, VMetaScanner do not allow 
ColumnNullable");
     }
 
-    RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+    if (_scan_range.meta_scan_range.metadata_type == 
TMetadataType::ACTIVE_BE_TASKS) {
+        // tvf active_be_tasks fetch data in be directly, it does not need to 
request FE for data
+        RETURN_IF_ERROR(_build_active_be_tasks_data());
+    } else {
+        RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+    }
     return Status::OK();
 }
 
@@ -288,6 +294,20 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     return Status::OK();
 }
 
+Status VMetaScanner::_build_active_be_tasks_data() {
+    std::vector<std::string> filter_columns;
+    for (const auto& slot : _tuple_desc->slots()) {
+        filter_columns.emplace_back(slot->col_name_lower_case());
+    }
+
+    std::vector<TRow> ret =
+            
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics(
+                    filter_columns);
+    _batch_data = std::move(ret);
+
+    return Status::OK();
+}
+
 Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                      
TFetchSchemaTableDataRequest* request) {
     VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 59bd55dc2d8..25cb9345311 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -91,6 +91,8 @@ private:
                                          TFetchSchemaTableDataRequest* 
request);
     Status _build_queries_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                            TFetchSchemaTableDataRequest* 
request);
+    Status _build_active_be_tasks_data();
+
     bool _meta_eos;
     TupleId _tuple_id;
     TUserIdentity _user_identity;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index ac1b31fceac..acdeb683f26 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.catalog;
 
+import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -59,7 +60,8 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(MvInfos.class, "mv_infos"),
             tableValued(Jobs.class, "jobs"),
             tableValued(Tasks.class, "tasks"),
-            tableValued(WorkloadGroups.class, "workload_groups")
+            tableValued(WorkloadGroups.class, "workload_groups"),
+            tableValued(ActiveBeTasks.class, "active_be_tasks")
     );
 
     public static final BuiltinTableValuedFunctions INSTANCE = new 
BuiltinTableValuedFunctions();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
similarity index 77%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
index f8dcaa4a7ec..5737f52a2b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
@@ -22,18 +22,18 @@ import 
org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.expressions.Properties;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction;
+import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 
 import java.util.Map;
 
 /**
- * queries tvf
+ *  stands be running tasks status, currently main including 
select/streamload/broker load/insert select
  */
-public class ActiveQueries extends TableValuedFunction {
+public class ActiveBeTasks extends TableValuedFunction {
 
-    public ActiveQueries(Properties properties) {
-        super("active_queries", properties);
+    public ActiveBeTasks(Properties properties) {
+        super("active_be_tasks", properties);
     }
 
     @Override
@@ -45,15 +45,14 @@ public class ActiveQueries extends TableValuedFunction {
     protected TableValuedFunctionIf toCatalogFunction() {
         try {
             Map<String, String> arguments = getTVFProperties().getMap();
-            return new ActiveQueriesTableValuedFunction(arguments);
+            return new ActiveBeTasksTableValuedFunction(arguments);
         } catch (Throwable t) {
-            throw new AnalysisException("Can not build 
FrontendsTableValuedFunction by "
+            throw new AnalysisException("Can not build 
ActiveBeTasksTableValuedFunction by "
                     + this + ": " + t.getMessage(), t);
         }
     }
 
-    @Override
     public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
-        return visitor.visitQueries(this, context);
+        return visitor.visitActiveBeTasks(this, context);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
index f8dcaa4a7ec..1e15b674249 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
@@ -47,7 +47,7 @@ public class ActiveQueries extends TableValuedFunction {
             Map<String, String> arguments = getTVFProperties().getMap();
             return new ActiveQueriesTableValuedFunction(arguments);
         } catch (Throwable t) {
-            throw new AnalysisException("Can not build 
FrontendsTableValuedFunction by "
+            throw new AnalysisException("Can not build 
ActiveQueriesTableValuedFunction by "
                     + this + ": " + t.getMessage(), t);
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index fba34d48168..36561e5b12c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.expressions.visitor;
 
+import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -102,4 +103,8 @@ public interface TableValuedFunctionVisitor<R, C> {
     default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
         return visitTableValuedFunction(workloadGroups, context);
     }
+
+    default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) {
+        return visitTableValuedFunction(beTasks, context);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
similarity index 53%
copy from 
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
index 27f65ed7680..99a8ba4886f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
@@ -23,74 +23,50 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataType;
-import org.apache.doris.thrift.TQueriesMetadataParams;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 
 import java.util.List;
 import java.util.Map;
 
-public class ActiveQueriesTableValuedFunction extends 
MetadataTableValuedFunction {
-    public static final String NAME = "active_queries";
+public class ActiveBeTasksTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "active_be_tasks";
 
     private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
-            new Column("BeHost", ScalarType.createStringType()),
-            new Column("BePort", PrimitiveType.BIGINT),
+            new Column("BeId", PrimitiveType.BIGINT),
+            new Column("FeHost", ScalarType.createStringType()),
             new Column("QueryId", ScalarType.createStringType()),
-            new Column("StartTime", ScalarType.createStringType()),
-            new Column("QueryTimeMs", PrimitiveType.BIGINT),
-            new Column("WorkloadGroupId", PrimitiveType.BIGINT),
-            new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
+            new Column("TaskTimeMs", PrimitiveType.BIGINT),
+            new Column("TaskCpuTimeMs", PrimitiveType.BIGINT),
             new Column("ScanRows", PrimitiveType.BIGINT),
             new Column("ScanBytes", PrimitiveType.BIGINT),
             new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
             new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
             new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
-            new Column("ShuffleSendRows", PrimitiveType.BIGINT),
-            new Column("Database", ScalarType.createStringType()),
-            new Column("FrontendInstance", ScalarType.createStringType()),
-            new Column("Sql", ScalarType.createStringType()));
-
-    private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
-
-    static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
-        for (int i = 0; i < SCHEMA.size(); i++) {
-            builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
-        }
-        COLUMN_TO_INDEX = builder.build();
-    }
-
-    public static Integer getColumnIndexFromColumnName(String columnName) {
-        return COLUMN_TO_INDEX.get(columnName.toLowerCase());
-    }
+            new Column("ShuffleSendRows", PrimitiveType.BIGINT));
 
-    public ActiveQueriesTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+    public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
         if (params.size() != 0) {
-            throw new AnalysisException("Queries table-valued-function does 
not support any params");
+            throw new AnalysisException("ActiveBeTasks table-valued-function 
does not support any params");
         }
     }
 
     @Override
     public TMetadataType getMetadataType() {
-        return TMetadataType.QUERIES;
+        return TMetadataType.ACTIVE_BE_TASKS;
     }
 
     @Override
     public TMetaScanRange getMetaScanRange() {
         TMetaScanRange metaScanRange = new TMetaScanRange();
-        metaScanRange.setMetadataType(TMetadataType.QUERIES);
-        TQueriesMetadataParams queriesMetadataParams = new 
TQueriesMetadataParams();
-        queriesMetadataParams.setClusterName("");
-        queriesMetadataParams.setRelayToOtherFe(true);
-        metaScanRange.setQueriesParams(queriesMetadataParams);
+        metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS);
         return metaScanRange;
     }
 
     @Override
     public String getTableName() {
-        return "QueriesTableValuedFunction";
+        return "ActiveBeTasksTableValuedFunction";
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index 27f65ed7680..41dd5484dd5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -68,7 +68,7 @@ public class ActiveQueriesTableValuedFunction extends 
MetadataTableValuedFunctio
 
     public ActiveQueriesTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
         if (params.size() != 0) {
-            throw new AnalysisException("Queries table-valued-function does 
not support any params");
+            throw new AnalysisException("ActiveQueries table-valued-function 
does not support any params");
         }
     }
 
@@ -90,7 +90,7 @@ public class ActiveQueriesTableValuedFunction extends 
MetadataTableValuedFunctio
 
     @Override
     public String getTableName() {
-        return "QueriesTableValuedFunction";
+        return "ActiveQueriesTableValuedFunction";
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index f9fb76a9666..4b755a97bf8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -78,6 +78,8 @@ public abstract class TableValuedFunctionIf {
                 return new ActiveQueriesTableValuedFunction(params);
             case WorkloadSchedPolicyTableValuedFunction.NAME:
                 return new WorkloadSchedPolicyTableValuedFunction(params);
+            case ActiveBeTasksTableValuedFunction.NAME:
+                return new ActiveBeTasksTableValuedFunction(params);
             default:
                 throw new AnalysisException("Could not find table function " + 
funcName);
         }
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 11ec1093da0..a1e414384d3 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -711,7 +711,8 @@ enum TMetadataType {
   JOBS,
   TASKS,
   QUERIES,
-  WORKLOAD_SCHED_POLICY
+  WORKLOAD_SCHED_POLICY,
+  ACTIVE_BE_TASKS,
 }
 
 enum TIcebergQueryType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to