This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 88eb907795b7c911c9f9ab8aa00aa4ac2a04c21d Author: jasonmfehr <[email protected]> AuthorDate: Thu Jul 31 16:54:47 2025 -0700 IMPALA-14282: Workload Management Custom Cluster Tests Use New Utility Functions Consume various utility functions added as part of previous changes. Testing accomplished by running exhaustive tests in test_query_log.py, test_query_live.py, and test_otel_trace.py both locally and in jenkins. Change-Id: If42a8b5b6fdb43fb2bb37dd2a3be4668e8a5e283 Reviewed-on: http://gerrit.cloudera.org:8080/23234 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/util/query_profile_util.py | 11 ++++-- tests/util/workload_management.py | 78 +++++++++++++++++---------------------- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py index f781c20e7..f18f14b64 100644 --- a/tests/util/query_profile_util.py +++ b/tests/util/query_profile_util.py @@ -109,16 +109,21 @@ def parse_retried_query_id(profile_text): return retried_query_id.group(1) -def parse_num_rows_fetched(profile_text): +def parse_num_rows_fetched(profile_text, missing_ok=False): """Parses the number of rows fetched from the query profile text.""" - num_rows_fetched = re.search(r'\n\s+\-\sNumRowsFetched:\s+(\d+)', profile_text) + num_rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', + profile_text) + if missing_ok and num_rows_fetched is None: + return None assert num_rows_fetched is not None, "Number of Rows Fetched not found in query profile" return int(num_rows_fetched.group(1)) -def parse_admission_result(profile_text): +def parse_admission_result(profile_text, missing_ok=False): """Parses the admission result from the query profile text.""" admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text) + if missing_ok and admission_result is None: + return None assert admission_result is not None, "Admission Result not found in query profile" return admission_result.group(1) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 8f29bc873..994feaed8 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -16,17 +16,28 @@ # under the License. from __future__ import absolute_import, division, print_function - +from datetime import datetime import os import re -import requests - -from datetime import datetime from time import sleep, time from impala_thrift_gen.SystemTables.ttypes import TQueryTableColumn from tests.util.assert_time import assert_time_str, convert_to_milliseconds from tests.util.memory import assert_byte_str, convert_to_bytes +from tests.util.query_profile_util import ( + parse_admission_result, + parse_coordinator, + parse_db_user, + parse_default_db, + parse_impala_query_state, + parse_num_rows_fetched, + parse_query_id, + parse_query_state, + parse_query_status, + parse_query_type, + parse_session_id, + parse_sql, +) DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600 WM_DB = "sys" @@ -61,16 +72,11 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, if query_id is not None: assert impalad is not None assert raw_profile is None, "cannot specify both query_id and raw_profile" - resp = requests.get("http://{0}:{1}/query_profile_plain_text?query_id={2}" - .format(impalad.hostname, impalad.get_webserver_port(), query_id)) - assert resp.status_code == 200, "Response code was: {0}".format(resp.status_code) - profile_text = resp.text + profile_text = impalad.service.read_query_profile_page(query_id) else: profile_text = raw_profile assert query_id is None, "cannot specify both raw_profile and query_id" - match = re.search(r'Query \(id=(.*?)\)', profile_text) - assert match is not None - query_id = match.group(1) + query_id = parse_query_id(profile_text) print("Query Id: {0}".format(query_id)) profile_lines = profile_text.split("\n") @@ -133,9 +139,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, assert column_val(TQueryTableColumn.QUERY_ID) == query_id # Session ID - session_id = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text) - assert session_id is not None - assert column_val(TQueryTableColumn.SESSION_ID) == session_id.group(1),\ + assert column_val(TQueryTableColumn.SESSION_ID) == parse_session_id(profile_text), \ "session id incorrect" # Session Type @@ -154,9 +158,8 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, assert value == "" # Database User - user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text) - assert user is not None - assert column_val(TQueryTableColumn.DB_USER) == user.group(1), "db user incorrect" + assert column_val(TQueryTableColumn.DB_USER) == parse_db_user(profile_text), \ + "db user incorrect" # Connected Database User db_user = re.search(r'\n\s+Connected User:\s+(.*?)\n', profile_text) @@ -165,46 +168,35 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, "db user connection incorrect" # Database Name - default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text) - assert default_db is not None - assert column_val(TQueryTableColumn.DB_NAME) == default_db.group(1),\ + default_db = parse_default_db(profile_text) + assert column_val(TQueryTableColumn.DB_NAME) == default_db, \ "database name incorrect" # Coordinator - coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text) - assert coordinator is not None - assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == coordinator.group(1),\ + coordinator = parse_coordinator(profile_text) + assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == coordinator, \ "impala coordinator incorrect" # Query Status (can be multiple lines if the query errored) - query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text, - re.DOTALL) - assert query_status is not None - assert column_val(TQueryTableColumn.QUERY_STATUS) == query_status.group(1),\ + assert column_val(TQueryTableColumn.QUERY_STATUS) == parse_query_status(profile_text), \ "query status incorrect" # Query State - query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text) - assert query_state is not None - query_state_value = query_state.group(1) + query_state_value = parse_query_state(profile_text) assert column_val(TQueryTableColumn.QUERY_STATE) == query_state_value,\ "query state incorrect" # Impala Query End State - impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text) - assert impala_query_state is not None assert column_val(TQueryTableColumn.IMPALA_QUERY_END_STATE) \ - == impala_query_state.group(1), "impala query end state incorrect" + == parse_impala_query_state(profile_text), "impala query end state incorrect" # Query Type value = column_val(TQueryTableColumn.QUERY_TYPE) if query_state_value == "EXCEPTION": assert value == "UNKNOWN", "query type incorrect" else: - query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text) - assert query_type is not None - assert value == query_type.group(1), "query type incorrect" - query_type = query_type.group(1) + query_type = parse_query_type(profile_text) + assert value == query_type, "query type incorrect" # Client Network Address network_address = re.search(r'\n\s+Network Address:\s+(.*?)\n', profile_text) @@ -332,13 +324,13 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, # Admission Result value = column_val(TQueryTableColumn.ADMISSION_RESULT) - adm_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text) + adm_result = parse_admission_result(profile_text, True) if query_state_value == "EXCEPTION" or query_type == "DDL": assert adm_result is None assert value == "", "admission result incorrect" else: assert adm_result is not None - assert value == adm_result.group(1), "admission result incorrect" + assert value == adm_result, "admission result incorrect" # Cluster Memory Admitted value = column_val(TQueryTableColumn.CLUSTER_MEMORY_ADMITTED) @@ -390,12 +382,12 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, # Rows Fetched value = column_val(TQueryTableColumn.NUM_ROWS_FETCHED) - rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', profile_text) + rows_fetched = parse_num_rows_fetched(profile_text, True) if query_state_value == "EXCEPTION": assert rows_fetched is None else: assert rows_fetched is not None - assert value == rows_fetched.group(1) + assert value == str(rows_fetched) # Row Materialization Rate value = column_val(TQueryTableColumn.ROW_MATERIALIZATION_ROWS_PER_SEC) @@ -597,9 +589,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, "pernode peak memory mean incorrect" # SQL statement - sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text) - assert sql_stmt is not None - assert column_val(TQueryTableColumn.SQL) == sql_stmt.group(1), "sql incorrect" + assert column_val(TQueryTableColumn.SQL) == parse_sql(profile_text), "sql incorrect" # Query Plan value = column_val(TQueryTableColumn.PLAN)
