This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 88eb907795b7c911c9f9ab8aa00aa4ac2a04c21d
Author: jasonmfehr <[email protected]>
AuthorDate: Thu Jul 31 16:54:47 2025 -0700

    IMPALA-14282: Workload Management Custom Cluster Tests Use New Utility 
Functions
    
    Consume various utility functions added as part of previous changes.
    
    Testing accomplished by running exhaustive tests in
    test_query_log.py, test_query_live.py, and test_otel_trace.py both
    locally and in jenkins.
    
    Change-Id: If42a8b5b6fdb43fb2bb37dd2a3be4668e8a5e283
    Reviewed-on: http://gerrit.cloudera.org:8080/23234
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/util/query_profile_util.py  | 11 ++++--
 tests/util/workload_management.py | 78 +++++++++++++++++----------------------
 2 files changed, 42 insertions(+), 47 deletions(-)

diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py
index f781c20e7..f18f14b64 100644
--- a/tests/util/query_profile_util.py
+++ b/tests/util/query_profile_util.py
@@ -109,16 +109,21 @@ def parse_retried_query_id(profile_text):
   return retried_query_id.group(1)
 
 
-def parse_num_rows_fetched(profile_text):
+def parse_num_rows_fetched(profile_text, missing_ok=False):
   """Parses the number of rows fetched from the query profile text."""
-  num_rows_fetched = re.search(r'\n\s+\-\sNumRowsFetched:\s+(\d+)', 
profile_text)
+  num_rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)',
+      profile_text)
+  if missing_ok and num_rows_fetched is None:
+    return None
   assert num_rows_fetched is not None, "Number of Rows Fetched not found in 
query profile"
   return int(num_rows_fetched.group(1))
 
 
-def parse_admission_result(profile_text):
+def parse_admission_result(profile_text, missing_ok=False):
   """Parses the admission result from the query profile text."""
   admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', 
profile_text)
+  if missing_ok and admission_result is None:
+    return None
   assert admission_result is not None, "Admission Result not found in query 
profile"
   return admission_result.group(1)
 
diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index 8f29bc873..994feaed8 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -16,17 +16,28 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
-
+from datetime import datetime
 import os
 import re
-import requests
-
-from datetime import datetime
 from time import sleep, time
 
 from impala_thrift_gen.SystemTables.ttypes import TQueryTableColumn
 from tests.util.assert_time import assert_time_str, convert_to_milliseconds
 from tests.util.memory import assert_byte_str, convert_to_bytes
+from tests.util.query_profile_util import (
+    parse_admission_result,
+    parse_coordinator,
+    parse_db_user,
+    parse_default_db,
+    parse_impala_query_state,
+    parse_num_rows_fetched,
+    parse_query_id,
+    parse_query_state,
+    parse_query_status,
+    parse_query_type,
+    parse_session_id,
+    parse_sql,
+)
 
 DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
 WM_DB = "sys"
@@ -61,16 +72,11 @@ def assert_query(query_tbl, client, expected_cluster_id="", 
raw_profile=None,
   if query_id is not None:
     assert impalad is not None
     assert raw_profile is None, "cannot specify both query_id and raw_profile"
-    resp = requests.get("http://{0}:{1}/query_profile_plain_text?query_id={2}";
-        .format(impalad.hostname, impalad.get_webserver_port(), query_id))
-    assert resp.status_code == 200, "Response code was: 
{0}".format(resp.status_code)
-    profile_text = resp.text
+    profile_text = impalad.service.read_query_profile_page(query_id)
   else:
     profile_text = raw_profile
     assert query_id is None, "cannot specify both raw_profile and query_id"
-    match = re.search(r'Query \(id=(.*?)\)', profile_text)
-    assert match is not None
-    query_id = match.group(1)
+    query_id = parse_query_id(profile_text)
 
   print("Query Id: {0}".format(query_id))
   profile_lines = profile_text.split("\n")
@@ -133,9 +139,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", 
raw_profile=None,
   assert column_val(TQueryTableColumn.QUERY_ID) == query_id
 
   # Session ID
-  session_id = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text)
-  assert session_id is not None
-  assert column_val(TQueryTableColumn.SESSION_ID) == session_id.group(1),\
+  assert column_val(TQueryTableColumn.SESSION_ID) == 
parse_session_id(profile_text), \
       "session id incorrect"
 
   # Session Type
@@ -154,9 +158,8 @@ def assert_query(query_tbl, client, expected_cluster_id="", 
raw_profile=None,
     assert value == ""
 
   # Database User
-  user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text)
-  assert user is not None
-  assert column_val(TQueryTableColumn.DB_USER) == user.group(1), "db user 
incorrect"
+  assert column_val(TQueryTableColumn.DB_USER) == parse_db_user(profile_text), 
\
+      "db user incorrect"
 
   # Connected Database User
   db_user = re.search(r'\n\s+Connected User:\s+(.*?)\n', profile_text)
@@ -165,46 +168,35 @@ def assert_query(query_tbl, client, 
expected_cluster_id="", raw_profile=None,
       "db user connection incorrect"
 
   # Database Name
-  default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
-  assert default_db is not None
-  assert column_val(TQueryTableColumn.DB_NAME) == default_db.group(1),\
+  default_db = parse_default_db(profile_text)
+  assert column_val(TQueryTableColumn.DB_NAME) == default_db, \
       "database name incorrect"
 
   # Coordinator
-  coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text)
-  assert coordinator is not None
-  assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == 
coordinator.group(1),\
+  coordinator = parse_coordinator(profile_text)
+  assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == coordinator, \
       "impala coordinator incorrect"
 
   # Query Status (can be multiple lines if the query errored)
-  query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', 
profile_text,
-      re.DOTALL)
-  assert query_status is not None
-  assert column_val(TQueryTableColumn.QUERY_STATUS) == query_status.group(1),\
+  assert column_val(TQueryTableColumn.QUERY_STATUS) == 
parse_query_status(profile_text), \
       "query status incorrect"
 
   # Query State
-  query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text)
-  assert query_state is not None
-  query_state_value = query_state.group(1)
+  query_state_value = parse_query_state(profile_text)
   assert column_val(TQueryTableColumn.QUERY_STATE) == query_state_value,\
       "query state incorrect"
 
   # Impala Query End State
-  impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', 
profile_text)
-  assert impala_query_state is not None
   assert column_val(TQueryTableColumn.IMPALA_QUERY_END_STATE) \
-      == impala_query_state.group(1), "impala query end state incorrect"
+      == parse_impala_query_state(profile_text), "impala query end state 
incorrect"
 
   # Query Type
   value = column_val(TQueryTableColumn.QUERY_TYPE)
   if query_state_value == "EXCEPTION":
     assert value == "UNKNOWN", "query type incorrect"
   else:
-    query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text)
-    assert query_type is not None
-    assert value == query_type.group(1), "query type incorrect"
-    query_type = query_type.group(1)
+    query_type = parse_query_type(profile_text)
+    assert value == query_type, "query type incorrect"
 
   # Client Network Address
   network_address = re.search(r'\n\s+Network Address:\s+(.*?)\n', profile_text)
@@ -332,13 +324,13 @@ def assert_query(query_tbl, client, 
expected_cluster_id="", raw_profile=None,
 
   # Admission Result
   value = column_val(TQueryTableColumn.ADMISSION_RESULT)
-  adm_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
+  adm_result = parse_admission_result(profile_text, True)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert adm_result is None
     assert value == "", "admission result incorrect"
   else:
     assert adm_result is not None
-    assert value == adm_result.group(1), "admission result incorrect"
+    assert value == adm_result, "admission result incorrect"
 
   # Cluster Memory Admitted
   value = column_val(TQueryTableColumn.CLUSTER_MEMORY_ADMITTED)
@@ -390,12 +382,12 @@ def assert_query(query_tbl, client, 
expected_cluster_id="", raw_profile=None,
 
   # Rows Fetched
   value = column_val(TQueryTableColumn.NUM_ROWS_FETCHED)
-  rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', 
profile_text)
+  rows_fetched = parse_num_rows_fetched(profile_text, True)
   if query_state_value == "EXCEPTION":
     assert rows_fetched is None
   else:
     assert rows_fetched is not None
-    assert value == rows_fetched.group(1)
+    assert value == str(rows_fetched)
 
   # Row Materialization Rate
   value = column_val(TQueryTableColumn.ROW_MATERIALIZATION_ROWS_PER_SEC)
@@ -597,9 +589,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", 
raw_profile=None,
       "pernode peak memory mean incorrect"
 
   # SQL statement
-  sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text)
-  assert sql_stmt is not None
-  assert column_val(TQueryTableColumn.SQL) == sql_stmt.group(1), "sql 
incorrect"
+  assert column_val(TQueryTableColumn.SQL) == parse_sql(profile_text), "sql 
incorrect"
 
   # Query Plan
   value = column_val(TQueryTableColumn.PLAN)

Reply via email to