This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8605246f3cb200f25dc426c858a5d606514db293 Author: jasonmfehr <[email protected]> AuthorDate: Wed Feb 26 09:35:03 2025 -0800 IMPALA-13710: Fix Flaky Workload Management Custom Cluster Tests The custom cluster workload management tests occasionally fail with query not found errors. These failures are very few and far between but still cause test flakiness. This patch adds retries into the common assertion functions to ensure that a slightly slower than normal workload management insert dml does not cause test failures. Testing accomplished by the following test suites passing locally: * tests/custom_cluster/test_query_live.py * tests/custom_cluster/test_query_log.py * tests/custom_cluster/test_workload_mgmt_init.py * tests/custom_cluster/test_workload_mgmt_sql_details.py Change-Id: I38dad4819190fa44ca2933be9907d7f1273621e3 Reviewed-on: http://gerrit.cloudera.org:8080/22551 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/util/workload_management.py | 66 ++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index f2204899d..a98c89fad 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -21,6 +21,7 @@ import re import requests from datetime import datetime +from time import sleep, time from SystemTables.ttypes import TQueryTableColumn from tests.util.assert_time import assert_time_str, convert_to_milliseconds @@ -32,6 +33,9 @@ QUERY_TBL_LOG_NAME = "impala_query_log" QUERY_TBL_LOG = "{0}.{1}".format(WM_DB, QUERY_TBL_LOG_NAME) QUERY_TBL_LIVE_NAME = "impala_query_live" QUERY_TBL_LIVE = "{0}.{1}".format(WM_DB, QUERY_TBL_LIVE_NAME) +# Time in seconds the assert_query and assert_csv_col will wait for the query to become +# available in the relevant workload management table. +ASSERT_QUERY_TIMEOUT_S = 30 def round_to_3(val): @@ -66,17 +70,30 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, print("Query Id: {0}".format(query_id)) profile_lines = profile_text.split("\n") - # Force Impala to process the inserts to the completed queries table. - if query_tbl != QUERY_TBL_LIVE: - client.execute("refresh " + query_tbl) + success = False + sql_results = None + + start_time = time() + while (time() - start_time <= ASSERT_QUERY_TIMEOUT_S): + # Force Impala to process the inserts to the completed queries table. + if query_tbl != QUERY_TBL_LIVE: + client.execute("refresh " + query_tbl) + + # Assert the query was written correctly to the query log table. + if max_row_size is not None: + client.set_configuration_option("MAX_ROW_SIZE", max_row_size) + sql_results = client.execute("select * from {0} where query_id='{1}'".format( + query_tbl, query_id)) + if sql_results.success and len(sql_results.data) == 1: + success = True + break + + # Query is not yet available in the workload management table, wait and try again. + sleep(1) + + assert success, "Did not find query '{}' in the '{}' table after multiple attempts" \ + .format(query_id, query_tbl) - # Assert the query was written correctly to the query log table. - if max_row_size is not None: - client.set_configuration_option("MAX_ROW_SIZE", max_row_size) - sql_results = client.execute("select * from {0} where query_id='{1}'".format( - query_tbl, query_id)) - assert sql_results.success - assert len(sql_results.data) == 1, "did not find query in completed queries table" # Assert the expected columns were included. assert len(sql_results.column_labels) == len(TQueryTableColumn._VALUES_TO_NAMES) @@ -675,16 +692,27 @@ def assert_csv_col(client, query_tbl, col, query_id, expected_list, db="tpcds"): print("Query Id: {0}".format(query_id)) - # Force Impala to process the inserts to the completed queries table. - if query_tbl != QUERY_TBL_LIVE: - client.execute("refresh " + query_tbl) + success = False + sql_results = None + + start_time = time() + while (time() - start_time <= ASSERT_QUERY_TIMEOUT_S): + # Force Impala to process the inserts to the completed queries table. + if query_tbl != QUERY_TBL_LIVE: + client.execute("refresh " + query_tbl) + + # Assert the query was written correctly to the query log table. + sql_results = client.execute("select * from {0} where query_id='{1}'".format( + query_tbl, query_id)) + if sql_results.success and len(sql_results.data) == 1: + success = True + break + + # Query is not yet available in the workload management table, wait and try again. + sleep(1) - # Assert the query was written correctly to the query log table. - sql_results = client.execute("select * from {0} where query_id='{1}'".format( - query_tbl, query_id)) - assert sql_results.success - assert len(sql_results.data) == 1, "did not find query '{}' in completed queries " \ - "table".format(query_id) + assert success, "Did not find query '{}' in the '{}' table after multiple attempts" \ + .format(query_id, query_tbl) data = sql_results.data[0].split("\t") actual = []
