This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 83734a1220bf99261779bbc5037708a7e1d78df0 Author: jasonmfehr <[email protected]> AuthorDate: Thu Mar 28 10:43:03 2024 -0700 IMPALA-12944: Fixes Workload Management Test Flakiness The custom cluster workload management tests are flaky because the tests can actually run before the completed queries table has been fully created by the Impala startup process. The table create sql runs asynchronously during startup and thus can take longer to finish than the custom cluster tests take to execute. This change adds checks at the beginning of each test to ensure the completed queries table sql has finished before any of the test code runs. Change-Id: I428702a210e024db95808dc2518da497426922f8 Reviewed-on: http://gerrit.cloudera.org:8080/21221 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/common/impala_test_suite.py | 16 ++++++++--- tests/custom_cluster/test_query_log.py | 49 +++++++++++++++++++++------------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 72b0e1fa3..40020e3d0 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1267,14 +1267,16 @@ class ImpalaTestSuite(BaseTestSuite): """ Convenience wrapper around assert_log_contains for impalad logs. """ - self.assert_log_contains("impalad", level, line_regex, expected_count, timeout_s) + return self.assert_log_contains( + "impalad", level, line_regex, expected_count, timeout_s) def assert_catalogd_log_contains(self, level, line_regex, expected_count=1, timeout_s=6): """ Convenience wrapper around assert_log_contains for catalogd logs. """ - self.assert_log_contains("catalogd", level, line_regex, expected_count, timeout_s) + return self.assert_log_contains( + "catalogd", level, line_regex, expected_count, timeout_s) def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6): """ @@ -1287,6 +1289,9 @@ class ImpalaTestSuite(BaseTestSuite): make sure that log buffering has been disabled, for example by adding '-logbuflevel=-1' to the daemon startup options or set timeout_s to a value higher than the log flush interval. + + Returns the result of the very last call to line_regex.search or None if + expected_count is 0 or the line_regex did not match any lines. """ pattern = re.compile(line_regex) start_time = time.time() @@ -1300,10 +1305,13 @@ class ImpalaTestSuite(BaseTestSuite): log_file_path = os.path.join(log_dir, daemon + "." + level) # Resolve symlinks to make finding the file easier. log_file_path = os.path.realpath(log_file_path) + last_re_result = None with open(log_file_path) as log_file: for line in log_file: - if pattern.search(line): + re_result = pattern.search(line) + if re_result: found += 1 + last_re_result = re_result if expected_count == -1: assert found > 0, "Expected at least one line in file %s matching regex '%s'"\ ", but found none." % (log_file_path, line_regex) @@ -1312,7 +1320,7 @@ class ImpalaTestSuite(BaseTestSuite): "Expected %d lines in file %s matching regex '%s', but found %d lines. "\ "Last line was: \n%s" %\ (expected_count, log_file_path, line_regex, found, line) - return + return last_re_result except AssertionError as e: # Re-throw the exception to the caller only when the timeout is expired. Otherwise # sleep before retrying. diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index e2f9a08fb..e02993217 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -56,9 +56,25 @@ class TestQueryLogTableBase(CustomClusterTestSuite): cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2)) - def get_client(self, protocol): + def get_client(self, protocol, query_table_name=""): """Retrieves the default Impala client for the specified protocol. This client is - automatically closed after the test completes.""" + automatically closed after the test completes. Also ensures the completed queries + table has been successfully created by checking the logs to verify the create + table sql has finished.""" + if query_table_name == "": + query_table_name = self.QUERY_TBL + + # These tests run very quickly and can actually complete before Impala has finished + # creating the completed queries table. Thus, to make these tests more robust, this + # code checks to make sure the table create has finished before returning. + create_re = r'\]\s+(\w+:\w+)\]\s+Analyzing query: CREATE TABLE IF NOT EXISTS {}' \ + .format(query_table_name) + create_match = self.assert_impalad_log_contains("INFO", create_re) + + finish_re = r'Query successfully unregistered: query_id={}' \ + .format(create_match.group(1)) + self.assert_impalad_log_contains("INFO", finish_re) + if protocol == self.PROTOCOL_BEESWAX: return self.client elif protocol == self.PROTOCOL_HS2: @@ -241,9 +257,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--query_log_write_interval_s=5 " "--shutdown_grace_period_s=10 " - "--shutdown_deadline_s=60 " - "--log_dir={0}" - .format(LOG_DIR_MAX_WRITES), + "--shutdown_deadline_s=60 ", + impala_log_dir=LOG_DIR_MAX_WRITES, catalogd_args="--enable_workload_mgmt", impalad_graceful_shutdown=True) def test_query_log_max_attempts_exceeded(self, vector): @@ -365,13 +380,14 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): def test_query_log_table_different_table(self, vector): """Asserts that the completed queries table can be renamed.""" - client = self.get_client(vector.get_value('protocol')) + client = self.get_client(vector.get_value('protocol'), + "{}.{}".format(self.WM_DB, self.OTHER_TBL)) try: res = client.execute("show tables in {0}".format(self.WM_DB)) assert res.success assert len(res.data) > 0, "could not find any tables in database {0}" \ - .format(self.DB) + .format(self.WM_DB) tbl_found = False for tbl in res.data: @@ -379,7 +395,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): tbl_found = True break assert tbl_found, "could not find table '{0}' in database '{1}'" \ - .format(self.OTHER_TBL, self.DB) + .format(self.OTHER_TBL, self.WM_DB) finally: client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL)) @@ -470,6 +486,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): these operations have a type of unknown and a normally invalid sql syntax. This test asserts those queries are not written to the completed queries table since they are trivial.""" + client = self.get_client(vector.get_value('protocol')) + host, port = IMPALAD_HS2_HOST_PORT.split(":") socket = TSocket(host, port) transport = TBufferedTransport(socket) @@ -568,9 +586,6 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): finally: socket.close() - # Assert none of the queries were written to the completed queries table. - client = self.create_impala_client(protocol=vector.get_value('protocol')) - # Execute a general query and wait for it to appear in the completed queries table to # ensure there are no false positives caused by the assertion query executing before # Impala has a chance to write queued queries to the completed queries table. @@ -581,13 +596,11 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): # Force Impala to process the inserts to the completed queries table. client.execute("refresh {}".format(self.QUERY_TBL)) - try: - assert_results = client.execute("select count(*) from {} where cluster_id='{}'" - .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID)) - assert assert_results.success - assert assert_results.data[0] == "1" - finally: - client.close() + # Assert only the one expected query was written to the completed queries table. + assert_results = client.execute("select count(*) from {} where cluster_id='{}'" + .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID)) + assert assert_results.success + assert assert_results.data[0] == "1" @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--query_log_write_interval_s=1 "
