This is an automated email from the ASF dual-hosted git repository. arawat pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ea989dfb283d1c80d96066f163d3f709adcf411d Author: jasonmfehr <[email protected]> AuthorDate: Mon Mar 3 15:52:51 2025 -0800 IMPALA-13815: Fix Flaky Workload Management Tests The CustomClusterTestSuite.wait_for_wm_init() function checks for two specific log lines to be logged by the catalog. The first line is logged when workload manangement initialization is complete. The second line is when a catalog topic update has been assembled. However, if workload management initialization is slow, then there may not be a catalog topic update assembled after the initialization completes. When this happens, an assertion fails despite the workload management tables having been properly initialized and loaded by the catalog. This patch simplifies the CustomClusterTestSuite.wait_for_wm_init() function so it waits until the catalogd logs it has completed workload management initialization and then checks each coordinator's local catalog cache for the workload management tables. The following test suites passed locally and in an ASAN build. These tests all call the 'wait_for_wm_init' function of CustomClusterTestSuite. * tests/custom_cluster/test_query_live.py * tests/custom_cluster/test_query_log.py * tests/custom_cluster/test_workload_mgmt_init.py * tests/custom_cluster/test_workload_mgmt_sql_details.py Change-Id: Ieb4c86fa79bb1df000b6241bdd31c7641d807c4f Reviewed-on: http://gerrit.cloudera.org:8080/22570 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Riza Suminto <[email protected]> --- tests/common/custom_cluster_test_suite.py | 42 ++++++++++++++----------------- tests/custom_cluster/test_query_live.py | 8 ------ 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index ae01adfb6..8819b8735 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -37,6 +37,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.impala_cluster import ImpalaCluster from tests.util.filesystem_utils import IS_LOCAL from tests.util.retry import retry +from tests.util.workload_management import QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME from time import sleep LOG = logging.getLogger(__name__) @@ -410,30 +411,25 @@ class CustomClusterTestSuite(ImpalaTestSuite): def wait_for_wm_init_complete(self, timeout_s=60): """ Waits for the catalog to report the workload management initialization process has - completed and for the catalog updates to be received by the coordinators. The input - timeout_s is used as the timeout for three separate function calls. Thus, the - theoretical max amount of time this function could wait is (timeout_s * 3). + completed and the workload management tables to be in the local catalog. The input + timeout_s is used as the timeout for multiple separate function calls. Thus, the + theoretical max amount of time this function could wait is: + timeout_s + (timeout_s * num_coordinators). """ - catalog_log = self.assert_log_contains_multiline("catalogd", "INFO", r'Completed ' - r'workload management initialization.*?A catalog update with \d+ entries is ' - r'assembled\. Catalog version: (\d+)', timeout_s) - - # Assert each coordinator has received a catalog update that was assembled after - # workload management completed. - for idx, _ in enumerate(self.cluster.get_all_coordinators()): - node_name = "impalad" - if idx > 0: - node_name += "_node" + str(idx) - - def assert_func(): - coord_log = self.assert_log_contains(node_name, "INFO", r'Catalog topic update ' - r'applied with version: (\d+)', timeout_s=timeout_s, expected_count=-1) - return int(coord_log.group(1)) >= int(catalog_log.group(1)) - - max_attempts = timeout_s / 3 - assert retry(func=assert_func, max_attempts=max_attempts, sleep_time_s=3, - backoff=1), "Expected a catalog topic update with version '{}' or later, but " \ - "no such update was found.".format(catalog_log.group(1)) + self.assert_catalogd_ha_contains("INFO", r'Completed workload management ' + r'initialization', timeout_s) + + for tbl in (QUERY_TBL_LIVE_NAME, QUERY_TBL_LOG_NAME): + for coord in self.cluster.get_all_coordinators(): + # Wait until table is available in the coordinator's catalog cache. + def exists_func(): + catalog_objs = coord.service.read_debug_webpage("catalog?json") + return tbl in catalog_objs + + max_attempts = timeout_s / 3 + assert retry(func=exists_func, max_attempts=max_attempts, sleep_time_s=3, + backoff=1), "Did not find table '{}' in local catalog of coordinator " \ + "'{}:{}'.".format(tbl, coord.hostname, coord.get_webserver_port()) @classmethod def _stop_impala_cluster(cls): diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index 3175e544a..e12254a27 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -36,14 +36,6 @@ class TestQueryLive(CustomClusterTestSuite): super(TestQueryLive, self).setup_method(method) self.wait_for_wm_init_complete() - # Wait until sys.impala_query_live is available in the coordinator's catalog cache. - def table_exists(): - catalog_objs = self.cluster.get_first_impalad() \ - .service.read_debug_webpage("catalog?json") - return "impala_query_live" in catalog_objs - - assert retry(func=table_exists, max_attempts=5, sleep_time_s=3, backoff=1) - def assert_describe_extended(self): describe_ext_result = self.execute_query('describe extended sys.impala_query_live') # Alter can add additional event fields. Filter them out.
