This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new af68d8b5077 [feat](profile) support getting query progress (#51400)
af68d8b5077 is described below
commit af68d8b5077782d65ec9cadba479ed58b8789e90
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Jun 21 00:39:30 2025 +0800
[feat](profile) support getting query progress (#51400)
### What problem does this PR solve?
Followup #50791
Add a new FE HTTP API: `/rest/v2/manager/query/statistics/trace_id`.
This API will return the query runtime statistic corresponding to a
given trace id.
The query statistics includes info such as real-time scan rows/bytes.
Internally, Doris will get query id by trace id from all Frontends, and
then fetch query statistics from BE.
Use pattern:
1. User set custom trace id by: `set
session_context="trace_id:my_trace_id"`
2. User executes a query in same session
3. Start a http client to get query statistics in real-time during the
query process.

Also fix a bug in `CoordinatorContext.java`, to get real host.
introduced from #41730
This PR also change the column name of `information_schema.processlist`
table, to be same as column
name in `show processlist`.
---
.../schema_scanner/schema_processlist_scanner.cpp | 39 +++---
be/src/runtime/fragment_mgr.cpp | 9 ++
be/src/runtime/fragment_mgr.h | 2 +
be/src/runtime/runtime_query_statistics_mgr.cpp | 13 ++
be/src/runtime/runtime_query_statistics_mgr.h | 3 +-
be/src/service/backend_service.cpp | 25 ++--
.../java/org/apache/doris/catalog/SchemaTable.java | 32 ++---
.../doris/common/profile/ProfileManager.java | 68 +++++++++--
.../doris/httpv2/controller/SessionController.java | 18 +--
.../doris/httpv2/rest/manager/HttpUtils.java | 6 +
.../httpv2/rest/manager/QueryProfileAction.java | 133 +++++++++++++++++----
.../plans/commands/ShowProcessListCommand.java | 28 ++---
.../java/org/apache/doris/qe/ConnectContext.java | 6 +-
.../java/org/apache/doris/qe/ConnectPoolMgr.java | 9 ++
.../java/org/apache/doris/qe/ConnectScheduler.java | 5 +
.../org/apache/doris/qe/CoordinatorContext.java | 2 +-
.../org/apache/doris/qe/ConnectContextTest.java | 2 +-
gensrc/thrift/BackendService.thrift | 3 +
.../plugins/plugin_curl_requester.groovy | 10 +-
...e_compaction_with_variant_inverted_index.groovy | 12 +-
.../test_information_schema_timezone.groovy | 4 +-
.../suites/manager/test_manager_interface_5.groovy | 99 +++++++++++++++
.../suites/show_p0/test_show_processlist.groovy | 29 ++++-
23 files changed, 438 insertions(+), 119 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index 7acb82ea849..69367556767 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -33,20 +33,21 @@ namespace doris {
#include "common/compile_check_begin.h"
std::vector<SchemaScanner::ColumnDesc>
SchemaProcessListScanner::_s_processlist_columns = {
- {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
- {"ID", TYPE_LARGEINT, sizeof(int128_t), false},
- {"USER", TYPE_VARCHAR, sizeof(StringRef), false},
- {"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
- {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
- {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
- {"DB", TYPE_VARCHAR, sizeof(StringRef), false},
- {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
- {"TIME", TYPE_INT, sizeof(int32_t), false},
- {"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
- {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
- {"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
- {"FE", TYPE_VARCHAR, sizeof(StringRef), false},
- {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}};
+ {"CurrentConnected", TYPE_VARCHAR, sizeof(StringRef), false}, // 0
+ {"Id", TYPE_LARGEINT, sizeof(int128_t), false}, //
1
+ {"User", TYPE_VARCHAR, sizeof(StringRef), false}, //
2
+ {"Host", TYPE_VARCHAR, sizeof(StringRef), false}, //
3
+ {"LoginTime", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, //
4
+ {"Catalog", TYPE_VARCHAR, sizeof(StringRef), false}, //
5
+ {"Db", TYPE_VARCHAR, sizeof(StringRef), false}, //
6
+ {"Command", TYPE_VARCHAR, sizeof(StringRef), false}, //
7
+ {"Time", TYPE_INT, sizeof(int32_t), false}, //
8
+ {"State", TYPE_VARCHAR, sizeof(StringRef), false}, //
9
+ {"QueryId", TYPE_VARCHAR, sizeof(StringRef), false}, //
10
+ {"TraceId", TYPE_VARCHAR, sizeof(StringRef), false}, //
11
+ {"Info", TYPE_VARCHAR, sizeof(StringRef), false}, //
12
+ {"FE", TYPE_VARCHAR, sizeof(StringRef), false}, //
13
+ {"CloudCluster", TYPE_VARCHAR, sizeof(StringRef), false}}; //
14
SchemaProcessListScanner::SchemaProcessListScanner()
: SchemaScanner(_s_processlist_columns,
TSchemaTableType::SCH_PROCESSLIST) {}
@@ -62,6 +63,16 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
TShowProcessListResult tmp_ret;
RETURN_IF_ERROR(
SchemaHelper::show_process_list(fe_addr.hostname,
fe_addr.port, request, &tmp_ret));
+
+ // Check and adjust the number of columns in each row to ensure 15
columns
+ // This is compatible with newly added column "trace id". #51400
+ for (auto& row : tmp_ret.process_list) {
+ if (row.size() == 14) {
+ // Insert an empty string at position 11 (index 11) for the
TRACE_ID column
+ row.insert(row.begin() + 11, "");
+ }
+ }
+
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
tmp_ret.process_list.begin(),
tmp_ret.process_list.end());
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 230cb12a110..90ec55ac0b5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1382,4 +1382,13 @@ Status FragmentMgr::get_realtime_exec_status(const
TUniqueId& query_id,
return Status::OK();
}
+Status FragmentMgr::get_query_statistics(const TUniqueId& query_id,
TQueryStatistics* query_stats) {
+ if (query_stats == nullptr) {
+ return Status::InvalidArgument("query_stats is nullptr");
+ }
+
+ return
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
+ print_id(query_id), query_stats);
+}
+
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 4eab31ff3a2..e6f388f73e1 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -184,6 +184,8 @@ public:
Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);
+ // get the query statistics of with a given query id
+ Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics*
query_stats);
std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 79db2cfec56..772925dcb7e 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -511,6 +511,19 @@ void
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
}
}
+Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string&
query_id,
+ TQueryStatistics*
query_stats) {
+ std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
+
+ auto resource_ctx = _resource_contexts_map.find(query_id);
+ if (resource_ctx == _resource_contexts_map.end()) {
+ return Status::InternalError("failed to find query with id {}",
query_id);
+ }
+
+ resource_ctx->second->to_thrift_query_statistics(query_stats);
+ return Status::OK();
+}
+
void RuntimeQueryStatisticsMgr::get_tasks_resource_context(
std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) {
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index ef80b6fa5b9..6b2e4545390 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -55,6 +55,7 @@ public:
// used for backend_active_tasks
void get_active_be_tasks_block(vectorized::Block* block);
+ Status get_query_statistics(const std::string& query_id, TQueryStatistics*
query_stats);
// used for MemoryReclamation
void
get_tasks_resource_context(std::vector<std::shared_ptr<ResourceContext>>&
resource_ctxs);
@@ -95,4 +96,4 @@ private:
std::unique_ptr<ThreadPool> _thread_pool;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 114d99fe264..33fa4401391 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1321,19 +1321,28 @@ void
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
std::make_unique<TReportExecStatusParams>();
- Status st =
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
- request.id, report_exec_status_params.get());
+ std::unique_ptr<TQueryStatistics> query_stats =
std::make_unique<TQueryStatistics>();
- if (!st.ok()) {
- response.__set_status(st.to_thrift());
- return;
+ std::string req_type = request.__isset.req_type ? request.req_type :
"profile";
+ Status st;
+ if (req_type == "stats") {
+ st =
ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id,
+
query_stats.get());
+ if (st.ok()) {
+ response.__set_query_stats(*query_stats);
+ }
+ } else {
+ // default is "profile"
+ st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
+ request.id, report_exec_status_params.get());
+ if (st.ok()) {
+
response.__set_report_exec_status_params(*report_exec_status_params);
+ }
}
report_exec_status_params->__set_query_id(TUniqueId());
report_exec_status_params->__set_done(false);
-
- response.__set_status(Status::OK().to_thrift());
- response.__set_report_exec_status_params(*report_exec_status_params);
+ response.__set_status(st.to_thrift());
}
void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 3d7f9603d97..ca42d7af40f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -523,22 +523,24 @@ public class SchemaTable extends Table {
.column("IS_MUTABLE",
ScalarType.createType(PrimitiveType.BOOLEAN))
.build()))
.put("processlist",
+ // ATTN, the column name should be compatible with MySQL
+ // See:
https://dev.mysql.com/doc/refman/8.4/en/show-processlist.html
new SchemaTable(SystemIdGenerator.getNextId(),
"processlist", TableType.SCHEMA,
- builder().column("CURRENT_CONNECTED",
ScalarType.createVarchar(16))
- .column("ID",
ScalarType.createType(PrimitiveType.LARGEINT))
- .column("USER",
ScalarType.createVarchar(32))
- .column("HOST",
ScalarType.createVarchar(261))
- .column("LOGIN_TIME",
ScalarType.createType(PrimitiveType.DATETIMEV2))
- .column("CATALOG",
ScalarType.createVarchar(64))
- .column("DB", ScalarType.createVarchar(64))
- .column("COMMAND",
ScalarType.createVarchar(16))
- .column("TIME",
ScalarType.createType(PrimitiveType.INT))
- .column("STATE",
ScalarType.createVarchar(64))
- .column("QUERY_ID",
ScalarType.createVarchar(256))
- .column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
- .column("FE",
- ScalarType.createVarchar(64))
- .column("CLOUD_CLUSTER",
ScalarType.createVarchar(64)).build(), true))
+ builder().column("CurrentConnected",
ScalarType.createVarchar(16))
+ .column("Id",
ScalarType.createType(PrimitiveType.LARGEINT))
+ .column("User",
ScalarType.createVarchar(32))
+ .column("Host",
ScalarType.createVarchar(261))
+ .column("LoginTime",
ScalarType.createType(PrimitiveType.DATETIMEV2))
+ .column("Catalog",
ScalarType.createVarchar(64))
+ .column("Db", ScalarType.createVarchar(64))
+ .column("Command",
ScalarType.createVarchar(16))
+ .column("Time",
ScalarType.createType(PrimitiveType.INT))
+ .column("State",
ScalarType.createVarchar(64))
+ .column("QueryId",
ScalarType.createVarchar(256))
+ .column("TraceId",
ScalarType.createVarchar(256))
+ .column("Info",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
+ .column("FE", ScalarType.createVarchar(64))
+ .column("CloudCluster",
ScalarType.createVarchar(64)).build(), true))
.put("workload_policy",
new SchemaTable(SystemIdGenerator.getNextId(),
"workload_policy", TableType.SCHEMA,
builder().column("ID",
ScalarType.createType(PrimitiveType.BIGINT))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index 5666ca8965a..36aa9a68b25 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -32,9 +32,11 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -253,7 +255,7 @@ public class ProfileManager extends MasterDaemon {
}
private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
- TUniqueId queryID, TNetworkAddress targetBackend) {
+ TUniqueId queryID, String reqType, TNetworkAddress targetBackend) {
TGetRealtimeExecStatusResponse resp = null;
BackendService.Client client = null;
@@ -268,6 +270,7 @@ public class ProfileManager extends MasterDaemon {
try {
TGetRealtimeExecStatusRequest req = new
TGetRealtimeExecStatusRequest();
req.setId(queryID);
+ req.setReqType(reqType);
resp = client.getRealtimeExecStatus(req);
} catch (TException e) {
LOG.warn("Got exception when getRealtimeExecStatus, query {}
backend {}",
@@ -293,8 +296,8 @@ public class ProfileManager extends MasterDaemon {
return null;
}
- if (!resp.isSetReportExecStatusParams()) {
- LOG.warn("Invalid GetRealtimeExecStatusResponse, query {}",
+ if (!resp.isSetReportExecStatusParams() && !resp.isSetQueryStats()) {
+ LOG.warn("Invalid GetRealtimeExecStatusResponse, missing both exec
status and query stats. query {}",
DebugUtil.printId(queryID));
return null;
}
@@ -302,7 +305,7 @@ public class ProfileManager extends MasterDaemon {
return resp;
}
- private List<Future<TGetRealtimeExecStatusResponse>>
createFetchRealTimeProfileTasks(String id) {
+ private List<Future<TGetRealtimeExecStatusResponse>>
createFetchRealTimeProfileTasks(String id, String reqType) {
// For query, id is queryId, for load, id is LoadLoadingTaskId
class QueryIdAndAddress {
public TUniqueId id;
@@ -365,9 +368,8 @@ public class ProfileManager extends MasterDaemon {
}
for (QueryIdAndAddress idAndAddress : involvedBackends) {
- Callable<TGetRealtimeExecStatusResponse> task = () -> {
- return getRealtimeQueryProfile(idAndAddress.id,
idAndAddress.beAddress);
- };
+ Callable<TGetRealtimeExecStatusResponse> task = () ->
getRealtimeQueryProfile(idAndAddress.id,
+ reqType, idAndAddress.beAddress);
Future<TGetRealtimeExecStatusResponse> future =
fetchRealTimeProfileExecutor.submit(task);
futures.add(future);
}
@@ -375,8 +377,57 @@ public class ProfileManager extends MasterDaemon {
return futures;
}
+ public TQueryStatistics getQueryStatistic(String queryId) throws Exception
{
+ List<Future<TGetRealtimeExecStatusResponse>> futures =
createFetchRealTimeProfileTasks(queryId,
+ "stats");
+ List<TQueryStatistics> queryStatisticsList = Lists.newArrayList();
+ for (Future<TGetRealtimeExecStatusResponse> future : futures) {
+ try {
+ TGetRealtimeExecStatusResponse resp = future.get(5,
TimeUnit.SECONDS);
+ if (resp != null && resp.getStatus().status_code ==
TStatusCode.OK && resp.isSetQueryStats()) {
+ queryStatisticsList.add(resp.getQueryStats());
+ } else {
+ LOG.warn("Failed to get real-time query stats, id {}, resp
is {}",
+ queryId, resp == null ? "null" : resp.toString());
+ throw new Exception("Failed to get realtime query stats: "
+ resp.toString());
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get real-time query stats, id {}, error:
{}", queryId, e.getMessage(), e);
+ throw new Exception("Failed to get realtime query stats: " +
e.getMessage());
+ }
+ }
+ Preconditions.checkState(!queryStatisticsList.isEmpty() &&
queryStatisticsList.size() == futures.size(),
+ String.format("Failed to get real-time stats, id %s, "
+ + "queryStatisticsList size %d != futures size
%d",
+ queryId, queryStatisticsList.size(), futures.size()));
+
+ TQueryStatistics summary = new TQueryStatistics();
+ for (TQueryStatistics queryStats : queryStatisticsList) {
+ // sum all the statistics
+ summary.setScanRows(summary.getScanRows() +
queryStats.getScanRows());
+ summary.setScanBytes(summary.getScanBytes() +
queryStats.getScanBytes());
+ summary.setReturnedRows(summary.getReturnedRows() +
queryStats.getReturnedRows());
+ summary.setCpuMs(summary.getCpuMs() + queryStats.getCpuMs());
+
summary.setMaxPeakMemoryBytes(Math.max(summary.getMaxPeakMemoryBytes(),
+ queryStats.getMaxPeakMemoryBytes()));
+
summary.setCurrentUsedMemoryBytes(Math.max(summary.getCurrentUsedMemoryBytes(),
+ queryStats.getCurrentUsedMemoryBytes()));
+ summary.setShuffleSendBytes(summary.getShuffleSendBytes() +
queryStats.getShuffleSendBytes());
+ summary.setShuffleSendRows(summary.getShuffleSendRows() +
queryStats.getShuffleSendRows());
+ summary.setScanBytesFromLocalStorage(
+ summary.getScanBytesFromLocalStorage() +
queryStats.getScanBytesFromLocalStorage());
+ summary.setScanBytesFromRemoteStorage(
+ summary.getScanBytesFromRemoteStorage() +
queryStats.getScanBytesFromRemoteStorage());
+ summary.setSpillWriteBytesToLocalStorage(
+ summary.getSpillWriteBytesToLocalStorage() +
queryStats.getSpillWriteBytesToLocalStorage());
+ summary.setSpillReadBytesFromLocalStorage(
+ summary.getSpillReadBytesFromLocalStorage() +
queryStats.getSpillReadBytesFromLocalStorage());
+ }
+ return summary;
+ }
+
public String getProfile(String id) {
- List<Future<TGetRealtimeExecStatusResponse>> futures =
createFetchRealTimeProfileTasks(id);
+ List<Future<TGetRealtimeExecStatusResponse>> futures =
createFetchRealTimeProfileTasks(id, "profile");
// beAddr of reportExecStatus of QeProcessorImpl is meaningless, so
assign a dummy address
// to avoid compile failing.
TNetworkAddress dummyAddr = new TNetworkAddress();
@@ -1057,3 +1108,4 @@ public class ProfileManager extends MasterDaemon {
}
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 8d9f791c0b0..e21e61b4ed9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -18,6 +18,8 @@
package org.apache.doris.httpv2.controller;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
@@ -56,20 +58,8 @@ public class SessionController extends RestBaseController {
private static final Logger LOG =
LogManager.getLogger(SessionController.class);
static {
- SESSION_TABLE_HEADER.add("CurrentConnected");
- SESSION_TABLE_HEADER.add("Id");
- SESSION_TABLE_HEADER.add("User");
- SESSION_TABLE_HEADER.add("Host");
- SESSION_TABLE_HEADER.add("LoginTime");
- SESSION_TABLE_HEADER.add("Catalog");
- SESSION_TABLE_HEADER.add("Db");
- SESSION_TABLE_HEADER.add("Command");
- SESSION_TABLE_HEADER.add("Time");
- SESSION_TABLE_HEADER.add("State");
- SESSION_TABLE_HEADER.add("QueryId");
- SESSION_TABLE_HEADER.add("Info");
- SESSION_TABLE_HEADER.add("FE");
- SESSION_TABLE_HEADER.add("CloudCluster");
+ Table tbl = SchemaTable.TABLE_MAP.get("processlist");
+ tbl.getBaseSchema().stream().forEach(column ->
SESSION_TABLE_HEADER.add(column.getName()));
}
@RequestMapping(path = "/session/all", method = RequestMethod.GET)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
index 8caab8df2d9..4330e4ace56 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Strings;
import com.google.gson.reflect.TypeToken;
@@ -57,6 +58,11 @@ public class HttpUtils {
.collect(Collectors.toList());
}
+ static boolean isCurrentFe(String ip, int port) {
+ HostInfo hostInfo = Env.getCurrentEnv().getSelfNode();
+ return hostInfo.isSame(new HostInfo(ip, port));
+ }
+
static String concatUrl(Pair<String, Integer> ipPort, String path,
Map<String, String> arguments) {
StringBuilder url = new StringBuilder("http://")
.append(ipPort.first).append(":").append(ipPort.second).append(path);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 0886edb56fb..829ffe05b06 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -38,6 +38,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
@@ -45,8 +46,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@@ -307,7 +311,31 @@ public class QueryProfileAction extends RestBaseController
{
@RequestParam(value = IS_ALL_NODE_PARA, required = false,
defaultValue = "true") boolean isAllNode) {
executeCheckPassword(request, response);
+ try {
+ String queryId = getQueryIdByTraceIdImpl(request, traceId,
isAllNode);
+ return ResponseEntityBuilder.ok(queryId);
+ } catch (Exception e) {
+ return ResponseEntityBuilder.badRequest(e.getMessage());
+ }
+ }
+
+ /**
+ * Get query id by trace id.
+ * return a non-empty query id corresponding to the trace id.
+ * Will throw an exception if the trace id is not found, or query id is
empty, or user does not have permission.
+ */
+ private String getQueryIdByTraceIdImpl(HttpServletRequest request, String
traceId, boolean isAllNode)
+ throws Exception {
+ // Get query id by trace id in current FE
+ ExecuteEnv env = ExecuteEnv.getInstance();
+ String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
+ if (!Strings.isNullOrEmpty(queryId)) {
+ checkAuthByUserAndQueryId(queryId);
+ return queryId;
+ }
+
if (isAllNode) {
+ // If the query id is not found in current FE, try to get it from
other FE
String httpPath = "/rest/v2/manager/query/trace_id/" + traceId;
ImmutableMap<String, String> arguments =
ImmutableMap.<String,
String>builder().put(IS_ALL_NODE_PARA, "false").build();
@@ -315,33 +343,29 @@ public class QueryProfileAction extends
RestBaseController {
ImmutableMap<String, String> header = ImmutableMap.<String,
String>builder()
.put(NodeAction.AUTHORIZATION,
request.getHeader(NodeAction.AUTHORIZATION)).build();
for (Pair<String, Integer> ipPort : frontends) {
+ if (HttpUtils.isCurrentFe(ipPort.first, ipPort.second)) {
+ // skip current FE.
+ continue;
+ }
String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
- try {
- String responseJson = HttpUtils.doGet(url, header);
- int code =
JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt();
- if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
- return responseJson;
+ String responseJson = HttpUtils.doGet(url, header);
+ JsonObject jObj =
JsonParser.parseString(responseJson).getAsJsonObject();
+ int code = jObj.get("code").getAsInt();
+ if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
+ if (!jObj.has("data") || jObj.get("data").isJsonNull() ||
Strings.isNullOrEmpty(
+ jObj.get("data").getAsString())) {
+ throw new Exception(String.format("trace id %s not
found", traceId));
}
- } catch (Exception e) {
- LOG.warn(e);
+ return jObj.get("data").getAsString();
}
+ LOG.warn("get query id by trace id error, resp: {}",
responseJson);
+ // If the response code is not success, it means that the
trace id is not found in this FE.
+ // Continue to try the next FE.
}
- } else {
- ExecuteEnv env = ExecuteEnv.getInstance();
- String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
- if (Strings.isNullOrEmpty(queryId)) {
- return ResponseEntityBuilder.badRequest("Not found");
- }
-
- try {
- checkAuthByUserAndQueryId(queryId);
- } catch (AuthenticationException e) {
- return ResponseEntityBuilder.badRequest(e.getMessage());
- }
-
- return ResponseEntityBuilder.ok(queryId);
}
- return ResponseEntityBuilder.badRequest("not found query id");
+
+ // Not found in all FE.
+ throw new Exception(String.format("trace id %s not found", traceId));
}
/**
@@ -505,4 +529,69 @@ public class QueryProfileAction extends RestBaseController
{
env.getScheduler().cancelQuery(queryId, new
Status(TStatusCode.CANCELLED, "cancel query by rest api"));
return ResponseEntityBuilder.ok();
}
+
+ /**
+ * Get real-time query statistics for with given query id.
+ * This API is used for getting the runtime query progress
+ *
+ * @param request
+ * @param response
+ * @param traceId: The user specified trace id, eg, set
session_context="trace_id:123456";
+ * @return
+ */
+ @RequestMapping(path = "/statistics/{trace_id}", method =
RequestMethod.GET)
+ public Object queryStatistics(HttpServletRequest request,
HttpServletResponse response,
+ @PathVariable("trace_id") String traceId) {
+ executeCheckPassword(request, response);
+
+ String queryId = null;
+ try {
+ queryId = getQueryIdByTraceIdImpl(request, traceId, true);
+ } catch (Exception e) {
+ return ResponseEntityBuilder.badRequest(e.getMessage());
+ }
+
+ try {
+ TQueryStatistics statistic =
ProfileManager.getInstance().getQueryStatistic(queryId);
+ return ResponseEntityBuilder.ok(new QueryStatistics(statistic));
+ } catch (Exception e) {
+ LOG.warn("get query statistics error, queryId:{}", queryId, e);
+ return ResponseEntityBuilder.badRequest(e.getMessage());
+ }
+ }
+
+ /**
+ * A class that represents the query runtime statistics.
+ */
+ @Getter
+ @Setter
+ public static class QueryStatistics {
+ public long scanRows;
+ public long scanBytes;
+ public long returnedRows;
+ public long cpuMs;
+ public long maxPeakMemoryBytes;
+ public long currentUsedMemoryBytes;
+ public long shuffleSendBytes;
+ public long shuffleSendRows;
+ public long scanBytesFromLocalStorage;
+ public long scanBytesFromRemoteStorage;
+ public long spillWriteBytesToLocalStorage;
+ public long spillReadBytesFromLocalStorage;
+
+ public QueryStatistics(TQueryStatistics queryStatistics) {
+ this.scanRows = queryStatistics.getScanRows();
+ this.scanBytes = queryStatistics.getScanBytes();
+ this.returnedRows = queryStatistics.getReturnedRows();
+ this.cpuMs = queryStatistics.getCpuMs();
+ this.maxPeakMemoryBytes = queryStatistics.getMaxPeakMemoryBytes();
+ this.currentUsedMemoryBytes =
queryStatistics.getCurrentUsedMemoryBytes();
+ this.shuffleSendBytes = queryStatistics.getShuffleSendBytes();
+ this.shuffleSendRows = queryStatistics.getShuffleSendRows();
+ this.scanBytesFromLocalStorage =
queryStatistics.getScanBytesFromLocalStorage();
+ this.scanBytesFromRemoteStorage =
queryStatistics.getScanBytesFromRemoteStorage();
+ this.spillWriteBytesToLocalStorage =
queryStatistics.getSpillWriteBytesToLocalStorage();
+ this.spillReadBytesFromLocalStorage =
queryStatistics.getSpillReadBytesFromLocalStorage();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
index 2fb33c45bf6..34bb18fe5e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
@@ -17,10 +17,9 @@
package org.apache.doris.nereids.trees.plans.commands;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.FrontendsProcNode;
@@ -47,21 +46,14 @@ import java.util.Optional;
*/
public class ShowProcessListCommand extends ShowCommand {
private static final Logger LOG =
LogManager.getLogger(ShowProcessListCommand.class);
- private static final ShowResultSetMetaData PROCESSLIST_META_DATA =
ShowResultSetMetaData.builder()
- .addColumn(new Column("CurrentConnected",
ScalarType.createVarchar(16)))
- .addColumn(new Column("Id",
ScalarType.createType(PrimitiveType.BIGINT)))
- .addColumn(new Column("User", ScalarType.createVarchar(16)))
- .addColumn(new Column("Host", ScalarType.createVarchar(16)))
- .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
- .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
- .addColumn(new Column("Db", ScalarType.createVarchar(16)))
- .addColumn(new Column("Command", ScalarType.createVarchar(16)))
- .addColumn(new Column("Time",
ScalarType.createType(PrimitiveType.INT)))
- .addColumn(new Column("State", ScalarType.createVarchar(64)))
- .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
- .addColumn(new Column("Info", ScalarType.STRING))
- .addColumn(new Column("FE", ScalarType.createVarchar(16)))
- .addColumn(new Column("CloudCluster",
ScalarType.createVarchar(16))).build();
+ private static final ShowResultSetMetaData PROCESSLIST_META_DATA;
+
+ static {
+ Table tbl = SchemaTable.TABLE_MAP.get("processlist");
+ ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
+ tbl.getBaseSchema().stream().forEach(column ->
builder.addColumn(column));
+ PROCESSLIST_META_DATA = builder.build();
+ }
private final boolean isFull;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c980de73b5a..78efb8a2d74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -949,6 +949,10 @@ public class ConnectContext {
}
public void setTraceId(String traceId) {
+ // When traceId is set, we need to remove the old traceId from
connectScheduler.
+ if (connectScheduler != null) {
+ connectScheduler.removeOldTraceId(this.traceId);
+ }
this.traceId = traceId;
}
@@ -1225,6 +1229,7 @@ public class ConnectContext {
row.add("" + (nowMs - startTime) / 1000);
row.add(state.toString());
row.add(DebugUtil.printId(queryId));
+ row.add(Strings.nullToEmpty(traceId));
if (state.getStateType() == QueryState.MysqlStateType.ERR) {
row.add(state.getErrorMessage());
} else if (executor != null) {
@@ -1247,7 +1252,6 @@ public class ConnectContext {
}
}
-
public void startAcceptQuery(ConnectProcessor connectProcessor) {
mysqlChannel.startAcceptQuery(this, connectProcessor);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
index ce91ef09e37..e0f311aec85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
@@ -84,6 +84,9 @@ public class ConnectPoolMgr {
if (conns != null) {
conns.decrementAndGet();
}
+ if (ctx.traceId() != null) {
+ traceId2QueryId.remove(ctx.traceId());
+ }
numberConnection.decrementAndGet();
}
}
@@ -155,6 +158,12 @@ public class ConnectPoolMgr {
return queryId == null ? "" : DebugUtil.printId(queryId);
}
+ public void removeTraceId(String traceId) {
+ if (traceId != null) {
+ traceId2QueryId.remove(traceId);
+ }
+ }
+
public Map<Integer, ConnectContext> getConnectionMap() {
return connectionMap;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index c8d27a23db6..32ea481fa9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -138,6 +138,11 @@ public class ConnectScheduler {
return queryId;
}
+ public void removeOldTraceId(String traceId) {
+ connectPoolMgr.removeTraceId(traceId);
+ flightSqlConnectPoolMgr.removeTraceId(traceId);
+ }
+
public Map<Integer, ConnectContext> getConnectionMap() {
Map<Integer, ConnectContext> map = Maps.newConcurrentMap();
map.putAll(connectPoolMgr.getConnectionMap());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index c9266cf6add..0fbcb5690fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -403,7 +403,7 @@ public class CoordinatorContext {
List<AssignedJob> instanceJobs = pipelinePlan.getInstanceJobs();
for (AssignedJob instanceJob : instanceJobs) {
DistributedPlanWorker worker = instanceJob.getAssignedWorker();
- backends.put(new TNetworkAddress(worker.address(),
worker.port()), worker.id());
+ backends.put(new TNetworkAddress(worker.host(),
worker.port()), worker.id());
}
}
return backends;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index 1c576053113..84bc45b19f4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -102,7 +102,7 @@ public class ConnectContextTest {
// Thread info
Assert.assertNotNull(ctx.toThreadInfo(false));
List<String> row = ctx.toThreadInfo(false).toRow(101, 1000,
Optional.empty());
- Assert.assertEquals(14, row.size());
+ Assert.assertEquals(15, row.size());
Assert.assertEquals("Yes", row.get(0));
Assert.assertEquals("101", row.get(1));
Assert.assertEquals("testUser", row.get(2));
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 16d1afda43e..6754e5f9f9f 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -344,11 +344,14 @@ struct TPublishTopicResult {
struct TGetRealtimeExecStatusRequest {
// maybe query id or other unique id
1: optional Types.TUniqueId id
+ 2: optional string req_type // "stats" or "profile"
}
struct TGetRealtimeExecStatusResponse {
1: optional Status.TStatus status
2: optional FrontendService.TReportExecStatusParams
report_exec_status_params
+ // query_stats is for getting real-time query statistics of a certain query
+ 3: optional FrontendService.TQueryStatistics query_stats
}
struct TDictionaryStatus {
diff --git a/regression-test/plugins/plugin_curl_requester.groovy
b/regression-test/plugins/plugin_curl_requester.groovy
index 05c43bbeebf..6fb76589fff 100644
--- a/regression-test/plugins/plugin_curl_requester.groovy
+++ b/regression-test/plugins/plugin_curl_requester.groovy
@@ -112,7 +112,7 @@ Suite.metaClass.http_client = { String method, String url
/* param */ ->
logger.info("Added 'http_client' function to Suite")
-Suite.metaClass.curl = { String method, String url, String body = null,
Integer timeoutSec = 10 /* param */->
+Suite.metaClass.curl = { String method, String url, String body = null,
Integer timeoutSec = 10, String user = "", String pwd = ""->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception(String.format("invalid curl method: %s", method))
@@ -125,11 +125,15 @@ Suite.metaClass.curl = { String method, String url,
String body = null, Integer
Integer retryCount = 0; // Current retry count
Integer sleepTime = 5000; // Sleep time in milliseconds
+ String auth = "";
+ if (!user.equals("")) {
+ auth = String.format("-u%s:%s", user, pwd).toString();
+ }
String cmd
if (method == "POST" && body != null) {
- cmd = String.format("curl --max-time %d -X %s -H
Content-Type:application/json -d %s %s", timeoutSec, method, body,
url).toString()
+ cmd = String.format("curl %s --max-time %d -X %s -H
Content-Type:application/json -d %s %s", auth, timeoutSec, method, body,
url).toString()
} else {
- cmd = String.format("curl --max-time %d -X %s %s", timeoutSec, method,
url).toString()
+ cmd = String.format("curl %s --max-time %d -X %s %s", auth,
timeoutSec, method, url).toString()
}
logger.info("curl cmd: " + cmd)
diff --git
a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
index aa312ac0b0c..37e7c3c4bbe 100644
---
a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
+++
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
@@ -63,10 +63,10 @@ suite("test_single_compaction_with_variant_inverted", "p2,
nonConcurrent") {
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ",
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
if (!disableAutoCompaction) {
- return "Success, " + out
+ return "Success, " + out2
}
- assertEquals(code, 0)
- return out
+ assertEquals(code2, 0)
+ return out2
}
def triggerSingleCompaction = { be_host, be_http_port, tablet_id ->
@@ -84,10 +84,10 @@ suite("test_single_compaction_with_variant_inverted", "p2,
nonConcurrent") {
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ",
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
if (!disableAutoCompaction) {
- return "Success, " + out
+ return "Success, " + out3
}
- assertEquals(code, 0)
- return out
+ assertEquals(code3, 0)
+ return out3
}
def waitForCompaction = { be_host, be_http_port, tablet_id ->
boolean running = true
diff --git
a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
index 17b42303f7f..3e3e168c054 100644
---
a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
+++
b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
@@ -88,7 +88,7 @@ suite("test_information_schema_timezone",
"p0,external,hive,kerberos,external_do
// 4. processlist
List<List<Object>> processlist_res_1 = sql """
- select LOGIN_TIME from information_schema.processlist where INFO
like "%information_schema.processlist%"
+ select LOGINTIME from information_schema.processlist where INFO
like "%information_schema.processlist%"
"""
logger.info("processlist_res_1 = " + processlist_res_1);
@@ -171,7 +171,7 @@ suite("test_information_schema_timezone",
"p0,external,hive,kerberos,external_do
// 4. processlist
List<List<Object>> processlist_res_2 = sql """
- select LOGIN_TIME from information_schema.processlist where INFO
like "%information_schema.processlist%"
+ select LOGINTIME from information_schema.processlist where INFO
like "%information_schema.processlist%"
"""
logger.info("processlist_res_2 = " + processlist_res_2);
assertEquals(true, isEightHoursDiff(processlist_res_1[0][0],
processlist_res_2[0][0]))
diff --git a/regression-test/suites/manager/test_manager_interface_5.groovy
b/regression-test/suites/manager/test_manager_interface_5.groovy
new file mode 100644
index 00000000000..5c44ae3d2d4
--- /dev/null
+++ b/regression-test/suites/manager/test_manager_interface_5.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+import java.time.LocalDateTime
+import java.time.Duration
+import java.time.format.DateTimeFormatter
+
+suite('test_manager_interface_5',"p0") {
+ // /rest/v2/manager/query/trace_id/{trace_id}
+ // /rest/v2/manager/query/trace_id/{trace_id}
+ def test_trace_id = {
+ def futures = []
+ futures.add( thread {
+ try{
+ sql """set
session_context="trace_id:test_manager_interface_5_trace_id""""
+ sql """ select * from numbers("number" = "9910") as a join
numbers('number'="18880094567") as b on a.number = b.number; """
+ }catch(Exception e){
+
+ }
+ })
+
+ futures.add( thread {
+ // test trace id in processlist
+ sleep(500);
+ List<List<Object>> result = sql_return_maparray """ show
processlist """
+ def queryid = ""
+ def x = 0
+ logger.info("result = ${result}")
+ for( int i =0 ;i < result.size();i++ ){
+ if (result[i]["Info"].contains("18880094567")) {
+ queryid = result[i]["QueryId"]
+
assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id"))
+ x = 1
+ break;
+ }
+ }
+ assertTrue(x == 1)
+
+ result = sql_return_maparray """ select * from
information_schema.processlist """
+ def queryid2 = ""
+ def x2 = 0
+ logger.info("result = ${result}")
+ for( int i =0 ;i < result.size();i++ ){
+ if (result[i]["Info"].contains("18880094567")) {
+ queryid2 = result[i]["QueryId"]
+
assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id"))
+ x2 = 1
+ break;
+ }
+ }
+ assertTrue(x2 == 1)
+
+ // test trace id in cancel query and get realtime query statistics
+ // 1. test get query id by trace id
+ def fes = sql_return_maparray "show frontends"
+ logger.info("frontends: ${fes}")
+ def fe = fes[0]
+ def url =
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/trace_id/test_manager_interface_5_trace_id?is_all_node=true"
+ def (code, out, err) = curl("GET", url, null, 10,
context.config.jdbcUser, context.config.jdbcPassword)
+ println "${out}"
+ assertTrue(code == 0)
+ def getQueryId = parseJson(out).get("data");
+ assertEquals(queryid2, getQueryId);
+
+ // 2. test get realtime query statistics by trace id
+ url =
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/statistics/test_manager_interface_5_trace_id?is_all_node=true"
+ (code, out, err) = curl("GET", url, null, 10,
context.config.jdbcUser, context.config.jdbcPassword)
+ println "${out}"
+ assertTrue(code == 0)
+ def stats = parseJson(out).get("data");
+ assertTrue(stats.containsKey("cpuMs"));
+
+ // 3. test cancel query by query id
+ url =
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/kill/${queryid2}?is_all_node=true"
+ (code, out, err) = curl("POST", url, null, 10,
context.config.jdbcUser, context.config.jdbcPassword)
+ println "${out}"
+ assertTrue(code == 0)
+ })
+ futures.each { it.get() }
+ }
+ test_trace_id();
+}
diff --git a/regression-test/suites/show_p0/test_show_processlist.groovy
b/regression-test/suites/show_p0/test_show_processlist.groovy
index b093261e7b2..e34db3b2524 100644
--- a/regression-test/suites/show_p0/test_show_processlist.groovy
+++ b/regression-test/suites/show_p0/test_show_processlist.groovy
@@ -21,20 +21,39 @@ suite("test_show_processlist") {
sql """set show_all_fe_connection = false;"""
def result = sql """show processlist;"""
logger.info("result:${result}")
- assertTrue(result[0].size() == 14)
+ assertTrue(result[0].size() == 15)
sql """set show_all_fe_connection = true;"""
result = sql """show processlist;"""
logger.info("result:${result}")
- assertTrue(result[0].size() == 14)
+ assertTrue(result[0].size() == 15)
sql """set show_all_fe_connection = false;"""
def url1 = "http://${context.config.feHttpAddress}/rest/v1/session"
result = Http.GET(url1, true)
logger.info("result:${result}")
- assertTrue(result["data"]["column_names"].size() == 14);
+ assertTrue(result["data"]["column_names"].size() == 15);
def url2 = "http://${context.config.feHttpAddress}/rest/v1/session/all"
result = Http.GET(url2, true)
logger.info("result:${result}")
- assertTrue(result["data"]["column_names"].size() == 14);
-}
\ No newline at end of file
+ assertTrue(result["data"]["column_names"].size() == 15);
+
+ result = sql """select * from information_schema.processlist"""
+ logger.info("result:${result}")
+ assertTrue(result[0].size() == 15)
+
+
+ def result1 = connect('root', context.config.jdbcPassword,
context.config.jdbcUrl) {
+ // execute sql with admin user
+ sql 'select 99 + 1'
+ sql 'set session_context="trace_id:test_show_processlist_trace_id"'
+ def result2 = sql """select * from information_schema.processlist"""
+ def found = false;
+ for (def row in result2) {
+ if (row[11].equals("test_show_processlist_trace_id")) {
+ found = true;
+ }
+ }
+ assertTrue(found)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]