This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 490f90c65e00510f21c909b367d027d2e3582f04 Author: jasonmfehr <[email protected]> AuthorDate: Mon Nov 25 13:42:59 2024 -0800 IMPALA-13536: Fix Workload Management Init Tests Issues Several problems with the workload management code and test_workload_mgmt_init.py tests have been uncovered by the Ozone tests. * test_create_on_version_1_0_0 - Test comment said it ran on 10 nodes, test configuration specified 1 node. Fix was to modify the test configuration. * test_create_on_version_1_1_0 - Test comment said it ran on 10 nodes, test configuration specified 1 node. Fix was to modify the test configuration. * test_invalid_* - All four of these tests run the same internal function to execute the test. This internal function was not waiting long enough for the expected failure to appear. The fixed internal function waits longer for the expected failure. Additionally, the @CustomClusterTestSuite annotation has a new option named 'log_symlinks', which, if set to True will resolve all daemon log symlinks and output their actual paths to the log. Failed tests can then be easily traced to the exact log files for that test. The existing workload management tests in testdata have been expanded to also assert the expected table properties are present. Modified tests passed on Ozone builds both with and without erasure coding enabled. Change-Id: Ie3f34088d1d925f30abb63471387e6fdb62b95a7 Reviewed-on: http://gerrit.cloudera.org:8080/22119 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../QueryTest/workload-management-log-v1.0.0.test | 56 ------ .../QueryTest/workload-management-log-v1.1.0.test | 61 ------- ...=> workload-mgmt-impala_query_live-v1.0.0.test} | 5 +- ...=> workload-mgmt-impala_query_live-v1.1.0.test} | 5 +- .../workload-mgmt-impala_query_log-v1.0.0.test | 59 ++++++ .../workload-mgmt-impala_query_log-v1.1.0.test | 64 +++++++ tests/common/custom_cluster_test_suite.py | 49 ++++- tests/custom_cluster/test_workload_mgmt_init.py | 202 +++++++++------------ tests/util/workload_management.py | 11 +- 9 files changed, 265 insertions(+), 247 deletions(-) diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.0.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.0.0.test deleted file mode 100644 index 0009df034..000000000 --- a/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.0.0.test +++ /dev/null @@ -1,56 +0,0 @@ -==== ----- QUERY -describe sys.impala_query_log ----- RESULTS -'cluster_id','string','','true' -'query_id','string','','true' -'session_id','string','','true' -'session_type','string','','true' -'hiveserver2_protocol_version','string','','true' -'db_user','string','','true' -'db_user_connection','string','','true' -'db_name','string','','true' -'impala_coordinator','string','','true' -'query_status','string','','true' -'query_state','string','','true' -'impala_query_end_state','string','','true' -'query_type','string','','true' -'network_address','string','','true' -'start_time_utc','timestamp','','true' -'total_time_ms','decimal(18,3)','','true' -'query_opts_config','string','','true' -'resource_pool','string','','true' -'per_host_mem_estimate','bigint','','true' -'dedicated_coord_mem_estimate','bigint','','true' -'per_host_fragment_instances','string','','true' -'backends_count','int','','true' -'admission_result','string','','true' -'cluster_memory_admitted','bigint','','true' -'executor_group','string','','true' -'executor_groups','string','','true' -'exec_summary','string','','true' -'num_rows_fetched','bigint','','true' -'row_materialization_rows_per_sec','bigint','','true' -'row_materialization_time_ms','decimal(18,3)','','true' -'compressed_bytes_spilled','bigint','','true' -'event_planning_finished','decimal(18,3)','','true' -'event_submit_for_admission','decimal(18,3)','','true' -'event_completed_admission','decimal(18,3)','','true' -'event_all_backends_started','decimal(18,3)','','true' -'event_rows_available','decimal(18,3)','','true' -'event_first_row_fetched','decimal(18,3)','','true' -'event_last_row_fetched','decimal(18,3)','','true' -'event_unregister_query','decimal(18,3)','','true' -'read_io_wait_total_ms','decimal(18,3)','','true' -'read_io_wait_mean_ms','decimal(18,3)','','true' -'bytes_read_cache_total','bigint','','true' -'bytes_read_total','bigint','','true' -'pernode_peak_mem_min','bigint','','true' -'pernode_peak_mem_max','bigint','','true' -'pernode_peak_mem_mean','bigint','','true' -'sql','string','','true' -'plan','string','','true' -'tables_queried','string','','true' ----- TYPES -string,string,string,string -==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.1.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.1.0.test deleted file mode 100644 index ae88dc53a..000000000 --- a/testdata/workloads/functional-query/queries/QueryTest/workload-management-log-v1.1.0.test +++ /dev/null @@ -1,61 +0,0 @@ -==== ----- QUERY -describe sys.impala_query_log ----- RESULTS -'cluster_id','string','','true' -'query_id','string','','true' -'session_id','string','','true' -'session_type','string','','true' -'hiveserver2_protocol_version','string','','true' -'db_user','string','','true' -'db_user_connection','string','','true' -'db_name','string','','true' -'impala_coordinator','string','','true' -'query_status','string','','true' -'query_state','string','','true' -'impala_query_end_state','string','','true' -'query_type','string','','true' -'network_address','string','','true' -'start_time_utc','timestamp','','true' -'total_time_ms','decimal(18,3)','','true' -'query_opts_config','string','','true' -'resource_pool','string','','true' -'per_host_mem_estimate','bigint','','true' -'dedicated_coord_mem_estimate','bigint','','true' -'per_host_fragment_instances','string','','true' -'backends_count','int','','true' -'admission_result','string','','true' -'cluster_memory_admitted','bigint','','true' -'executor_group','string','','true' -'executor_groups','string','','true' -'exec_summary','string','','true' -'num_rows_fetched','bigint','','true' -'row_materialization_rows_per_sec','bigint','','true' -'row_materialization_time_ms','decimal(18,3)','','true' -'compressed_bytes_spilled','bigint','','true' -'event_planning_finished','decimal(18,3)','','true' -'event_submit_for_admission','decimal(18,3)','','true' -'event_completed_admission','decimal(18,3)','','true' -'event_all_backends_started','decimal(18,3)','','true' -'event_rows_available','decimal(18,3)','','true' -'event_first_row_fetched','decimal(18,3)','','true' -'event_last_row_fetched','decimal(18,3)','','true' -'event_unregister_query','decimal(18,3)','','true' -'read_io_wait_total_ms','decimal(18,3)','','true' -'read_io_wait_mean_ms','decimal(18,3)','','true' -'bytes_read_cache_total','bigint','','true' -'bytes_read_total','bigint','','true' -'pernode_peak_mem_min','bigint','','true' -'pernode_peak_mem_max','bigint','','true' -'pernode_peak_mem_mean','bigint','','true' -'sql','string','','true' -'plan','string','','true' -'tables_queried','string','','true' -'select_columns','string','','true' -'where_columns','string','','true' -'join_columns','string','','true' -'aggregate_columns','string','','true' -'orderby_columns','string','','true' ----- TYPES -string,string,string,string -==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.0.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.0.0.test similarity index 90% rename from testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.0.0.test rename to testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.0.0.test index 8d4ef0c13..cb63ee3fe 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.0.0.test +++ b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.0.0.test @@ -1,6 +1,6 @@ ==== ---- QUERY -describe sys.impala_query_live +describe formatted sys.impala_query_live ---- RESULTS 'cluster_id','string','' 'query_id','string','' @@ -51,6 +51,9 @@ describe sys.impala_query_live 'sql','string','' 'plan','string','' 'tables_queried','string','' +---- RESULTS: VERIFY_IS_SUBSET +'','schema_version ','1.0.0 ' +'','wm_schema_version ','1.0.0 ' ---- TYPES string,string,string ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.1.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.1.0.test similarity index 91% rename from testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.1.0.test rename to testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.1.0.test index 7a58f3f76..f7c0a948e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/workload-management-live-v1.1.0.test +++ b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.1.0.test @@ -1,6 +1,6 @@ ==== ---- QUERY -describe sys.impala_query_live +describe formatted sys.impala_query_live ---- RESULTS 'cluster_id','string','' 'query_id','string','' @@ -56,6 +56,9 @@ describe sys.impala_query_live 'join_columns','string','' 'aggregate_columns','string','' 'orderby_columns','string','' +---- RESULTS: VERIFY_IS_SUBSET +'','schema_version ','1.0.0 ' +'','wm_schema_version ','1.1.0 ' ---- TYPES string,string,string ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.0.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.0.0.test new file mode 100644 index 000000000..4b4f3d5dd --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.0.0.test @@ -0,0 +1,59 @@ +==== +---- QUERY +describe formatted sys.impala_query_log +---- RESULTS +'cluster_id','string',NULL +'query_id','string',NULL +'session_id','string',NULL +'session_type','string',NULL +'hiveserver2_protocol_version','string',NULL +'db_user','string',NULL +'db_user_connection','string',NULL +'db_name','string',NULL +'impala_coordinator','string',NULL +'query_status','string',NULL +'query_state','string',NULL +'impala_query_end_state','string',NULL +'query_type','string',NULL +'network_address','string',NULL +'start_time_utc','timestamp',NULL +'total_time_ms','decimal(18,3)',NULL +'query_opts_config','string',NULL +'resource_pool','string',NULL +'per_host_mem_estimate','bigint',NULL +'dedicated_coord_mem_estimate','bigint',NULL +'per_host_fragment_instances','string',NULL +'backends_count','int',NULL +'admission_result','string',NULL +'cluster_memory_admitted','bigint',NULL +'executor_group','string',NULL +'executor_groups','string',NULL +'exec_summary','string',NULL +'num_rows_fetched','bigint',NULL +'row_materialization_rows_per_sec','bigint',NULL +'row_materialization_time_ms','decimal(18,3)',NULL +'compressed_bytes_spilled','bigint',NULL +'event_planning_finished','decimal(18,3)',NULL +'event_submit_for_admission','decimal(18,3)',NULL +'event_completed_admission','decimal(18,3)',NULL +'event_all_backends_started','decimal(18,3)',NULL +'event_rows_available','decimal(18,3)',NULL +'event_first_row_fetched','decimal(18,3)',NULL +'event_last_row_fetched','decimal(18,3)',NULL +'event_unregister_query','decimal(18,3)',NULL +'read_io_wait_total_ms','decimal(18,3)',NULL +'read_io_wait_mean_ms','decimal(18,3)',NULL +'bytes_read_cache_total','bigint',NULL +'bytes_read_total','bigint',NULL +'pernode_peak_mem_min','bigint',NULL +'pernode_peak_mem_max','bigint',NULL +'pernode_peak_mem_mean','bigint',NULL +'sql','string',NULL +'plan','string',NULL +'tables_queried','string',NULL +---- RESULTS: VERIFY_IS_SUBSET +'','schema_version ','1.0.0 ' +'','wm_schema_version ','1.0.0 ' +---- TYPES +string,string,string +==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.1.0.test b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.1.0.test new file mode 100644 index 000000000..c69e537bd --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.1.0.test @@ -0,0 +1,64 @@ +==== +---- QUERY +describe formatted sys.impala_query_log +---- RESULTS +'cluster_id','string',NULL +'query_id','string',NULL +'session_id','string',NULL +'session_type','string',NULL +'hiveserver2_protocol_version','string',NULL +'db_user','string',NULL +'db_user_connection','string',NULL +'db_name','string',NULL +'impala_coordinator','string',NULL +'query_status','string',NULL +'query_state','string',NULL +'impala_query_end_state','string',NULL +'query_type','string',NULL +'network_address','string',NULL +'start_time_utc','timestamp',NULL +'total_time_ms','decimal(18,3)',NULL +'query_opts_config','string',NULL +'resource_pool','string',NULL +'per_host_mem_estimate','bigint',NULL +'dedicated_coord_mem_estimate','bigint',NULL +'per_host_fragment_instances','string',NULL +'backends_count','int',NULL +'admission_result','string',NULL +'cluster_memory_admitted','bigint',NULL +'executor_group','string',NULL +'executor_groups','string',NULL +'exec_summary','string',NULL +'num_rows_fetched','bigint',NULL +'row_materialization_rows_per_sec','bigint',NULL +'row_materialization_time_ms','decimal(18,3)',NULL +'compressed_bytes_spilled','bigint',NULL +'event_planning_finished','decimal(18,3)',NULL +'event_submit_for_admission','decimal(18,3)',NULL +'event_completed_admission','decimal(18,3)',NULL +'event_all_backends_started','decimal(18,3)',NULL +'event_rows_available','decimal(18,3)',NULL +'event_first_row_fetched','decimal(18,3)',NULL +'event_last_row_fetched','decimal(18,3)',NULL +'event_unregister_query','decimal(18,3)',NULL +'read_io_wait_total_ms','decimal(18,3)',NULL +'read_io_wait_mean_ms','decimal(18,3)',NULL +'bytes_read_cache_total','bigint',NULL +'bytes_read_total','bigint',NULL +'pernode_peak_mem_min','bigint',NULL +'pernode_peak_mem_max','bigint',NULL +'pernode_peak_mem_mean','bigint',NULL +'sql','string',NULL +'plan','string',NULL +'tables_queried','string',NULL +'select_columns','string',NULL +'where_columns','string',NULL +'join_columns','string',NULL +'aggregate_columns','string',NULL +'orderby_columns','string',NULL +---- RESULTS: VERIFY_IS_SUBSET +'','schema_version ','1.0.0 ' +'','wm_schema_version ','1.1.0 ' +---- TYPES +string,string,string +==== \ No newline at end of file diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index aa9f0d4e4..582b2d211 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -27,6 +27,7 @@ import pipes import pytest import subprocess +from glob import glob from impala_py_lib.helpers import find_all_files, is_core_dump from re import search from signal import SIGRTMIN @@ -79,6 +80,9 @@ TMP_DIR_PLACEHOLDERS = 'tmp_dir_placeholders' EXPECT_STARTUP_FAIL = 'expect_startup_fail' # If True, add '--logbuflevel=-1' into all impala daemon args. DISABLE_LOG_BUFFERING = 'disable_log_buffering' +# If True, resolves the actual files for all the log symlinks and outputs the resolved +# paths to stderr. +LOG_SYMLINKS = 'log_symlinks' # Args that accept additional formatting to supply temporary dir path. ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR]) @@ -149,7 +153,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None, impalad_timeout_s=None, expect_cores=None, reset_ranger=False, impalad_graceful_shutdown=False, tmp_dir_placeholders=[], - expect_startup_fail=False, disable_log_buffering=False): + expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False): """Records arguments to be passed to a cluster by adding them to the decorated method's func_dict""" args = dict() @@ -191,6 +195,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): args[EXPECT_STARTUP_FAIL] = True if disable_log_buffering: args[DISABLE_LOG_BUFFERING] = True + if log_symlinks: + args[LOG_SYMLINKS] = True def decorate(obj): """If obj is a class, set SHARED_CLUSTER_ARGS for setup/teardown_class. Otherwise @@ -372,11 +378,18 @@ class CustomClusterTestSuite(ImpalaTestSuite): self.cluster_teardown(method.__name__, method.__dict__) def wait_for_wm_init_complete(self, timeout_s=120): - """Waits for the catalog to report the workload management initialization process - has completed.""" + """ + Waits for the catalog to report the workload management initialization process has + completed and for the catalog updates to be received by the coordinators. + """ self.assert_catalogd_log_contains("INFO", r'Completed workload management ' r'initialization', timeout_s=timeout_s) + ret = self.assert_catalogd_log_contains("INFO", r'A catalog update with \d+ entries ' + r'is assembled. Catalog version: (\d+)', timeout_s=10, expected_count=-1) + self.assert_impalad_log_contains("INFO", r'Catalog topic update applied with ' + r'version: {}'.format(ret.group(1)), timeout_s=30) + @classmethod def _stop_impala_cluster(cls): # TODO: Figure out a better way to handle case where processes are just starting @@ -431,6 +444,30 @@ class CustomClusterTestSuite(ImpalaTestSuite): check_call([script_run_ranger]) check_call([script_setup_ranger]) + @classmethod + def _log_symlinks(cls, logdir=None, log=None, + patterns=["*.INFO", "*.WARNING", "*.ERROR", "*.FATAL"]): + """ + Resolves all symlinks in the specified logdir and print out their actual paths. + If the 'logdir' parameter is None, will use the value in 'cls.impala_log_dir'. + If the 'log' parameter is None, will call the 'print' function to output the resolved + paths, otherwise will call 'log.info' to output the resolved paths. + """ + if logdir is None: + logdir = cls.impala_log_dir + + file_list = "Log Files for Test:\n" + for pattern in patterns: + matching_files = glob(os.path.join(logdir, pattern)) + for matching_file in sorted(matching_files): + file_list += " * {} - {}\n".format(matching_file.split(os.path.sep)[-1], + os.path.realpath(matching_file)) + + if log is None: + print(file_list) + else: + log.info(file_list) + @classmethod def _start_impala_cluster(cls, options, @@ -447,7 +484,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): statestored_timeout_s=60, impalad_timeout_s=60, ignore_pid_on_log_rotation=False, - wait_for_backends=True): + wait_for_backends=True, + log_symlinks=False): cls.impala_log_dir = impala_log_dir # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that # certain custom startup arguments work and we want to keep them independent of dev @@ -492,6 +530,9 @@ class CustomClusterTestSuite(ImpalaTestSuite): try: check_call(cmd + options, close_fds=True) finally: + if log_symlinks: + cls._log_symlinks(log=LOG) + # Failure tests expect cluster to be initialised even if start-impala-cluster fails. cls.cluster = ImpalaCluster.get_e2e_test_cluster() statestored = cls.cluster.statestored diff --git a/tests/custom_cluster/test_workload_mgmt_init.py b/tests/custom_cluster/test_workload_mgmt_init.py index 7963a0661..55b4fdcf0 100644 --- a/tests/custom_cluster/test_workload_mgmt_init.py +++ b/tests/custom_cluster/test_workload_mgmt_init.py @@ -52,9 +52,9 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): def setup_method(self, method): super(TestWorkloadManagementInitBase, self).setup_method(method) - def restart_cluster(self, schema_version="", wait_for_init_complete=True, + def restart_cluster(self, vector, schema_version="", wait_for_init_complete=True, cluster_size=3, additional_impalad_opts="", wait_for_backends=True, - additional_catalogd_opts="", expect_startup_err=False): + additional_catalogd_opts="", expect_startup_err=False, log_symlinks=False): """Wraps the existing custom cluster _start_impala_cluster function to restart the Impala cluster. Specifies coordinator/catalog startup flags to enable workload management and set the schema version. If wait_for_init_complete is True, this @@ -78,11 +78,14 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): self._start_impala_cluster(options=[coord_opts, catalog_opts], cluster_size=cluster_size, expected_num_impalads=cluster_size, - num_coordinators=num_coords, wait_for_backends=wait_for_backends) + num_coordinators=num_coords, wait_for_backends=wait_for_backends, + log_symlinks=log_symlinks) except CalledProcessError as e: if not expect_startup_err: raise e + self.client = self.create_impala_client(protocol=vector.get_value("protocol")) + if wait_for_init_complete: self.wait_for_wm_init_complete() @@ -93,7 +96,7 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): assert expected_val == "" or should_exist, "Cannot specify both the expected_val " \ "and should_exist properties." - res = self.create_impala_client().execute("show create table {}".format(tbl_name)) + res = self.client.execute("show create table {}".format(tbl_name)) assert res.success if should_exist: @@ -116,43 +119,15 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): """Asserts a given regex is found in the catalog log file for each workload management table. The regex is passed the fully qualified table name using python string substitution.""" - for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LOG): + for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): self.assert_catalogd_log_contains("INFO", line_regex.format(table)) - def check_schema_version(self, schema_version): - """Asserts that all workload management tables are at the specified schema version.""" - for tbl_name in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): - client = self.create_impala_client() - - # Ensure the sys db exists - assert client.execute("describe database {}".format(self.WM_DB)).success - - # Ensure the schema_version table property has been properly set. - self.assert_table_prop(tbl_name, "schema_version", "1.0.0") - self.assert_table_prop(tbl_name, "wm_schema_version", schema_version) - - # Assert the expected columns are in the table. - res = client.execute("describe extended {}".format(tbl_name)) - assert res.success - - found = False - version_1_1_0_fields_found = False - - for line in res.data: - if re.search(r"schema_version\s+{}".format(schema_version), line): - found = True - elif re.search("_columns", line): - version_1_1_0_fields_found = True - - assert found, "did not find expected table schema '{}' on table '{}'" \ - .format(schema_version, tbl_name) - - if schema_version == "1.0.0": - assert not version_1_1_0_fields_found, "found fields that are part of version " \ - "'1.1.0' schema but expected schema version was '1.0.0'" - elif schema_version == "1.1.0": - assert version_1_1_0_fields_found, "did not find expected fields that are part " \ - "of version 1.1.0 schema" + def check_schema(self, schema_ver, vector, multiple_impalad=False): + """Asserts that all workload management tables have the correct columns and are at the + specified schema version.""" + for tbl_name in (self.QUERY_TBL_LOG_NAME, self.QUERY_TBL_LIVE_NAME): + self.run_test_case('QueryTest/workload-mgmt-{}-v{}'.format(tbl_name, schema_ver), + vector, self.WM_DB, multiple_impalad=multiple_impalad) class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): @@ -169,45 +144,35 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", disable_log_buffering=True) - def test_no_upgrade(self): + def test_no_upgrade(self, vector): """Tests that no upgrade happens when starting a cluster where the workload management tables are already at version 1.1.0.""" - self.restart_cluster("1.1.0") - self.check_schema_version("1.1.0") + self.restart_cluster(vector, schema_version="1.1.0", log_symlinks=True) + self.check_schema("1.1.0", vector) self.assert_catalogd_log_contains("INFO", r"Workload management table .*? will be " r"upgraded", expected_count=0) - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, + @CustomClusterTestSuite.with_args(cluster_size=10, disable_log_buffering=True, + log_symlinks=True, impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.0.0 " "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") def test_create_on_version_1_0_0(self, vector): """Asserts that workload management tables are properly created on version 1.0.0 using - a 10 node cluster when no tables exist. - Cluster sizes of 1 are used to speed up the initial setup.""" - self.check_schema_version("1.0.0") + a 10 node cluster when no tables exist.""" + self.check_schema("1.0.0", vector, multiple_impalad=True) - # Asserts the correct create table sql was used. - self.client = self.create_impala_client() - self.run_test_case('QueryTest/workload-management-log-v1.0.0', vector, self.WM_DB) - self.run_test_case('QueryTest/workload-management-live-v1.0.0', vector, self.WM_DB) - - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, + @CustomClusterTestSuite.with_args(cluster_size=10, disable_log_buffering=True, + log_symlinks=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") def test_create_on_version_1_1_0(self, vector): """Asserts that workload management tables are properly created on version 1.1.0 using - a 10 node cluster when no tables exist. - Cluster sizes of 1 are used to speed up the initial setup.""" - self.check_schema_version("1.1.0") - - # Asserts the correct create table sql was used. - self.client = self.create_impala_client() - self.run_test_case('QueryTest/workload-management-log-v1.1.0', vector, self.WM_DB) - self.run_test_case('QueryTest/workload-management-live-v1.1.0', vector, self.WM_DB) + a 10 node cluster when no tables exist.""" + self.check_schema("1.1.0", vector, multiple_impalad=True) @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", @@ -216,56 +181,50 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): "--workload_mgmt_drop_tables=impala_query_log,impala_query_live", disable_log_buffering=True) def test_upgrade_1_0_0_to_1_1_0(self, vector): - """Asserts that an upgrade from version 1.0.0 to 1.1.0 succeeds on a 10 node cluster - when starting with no existing workload management tables.""" + """Asserts that an upgrade from version 1.0.0 to 1.1.0 succeeds when starting with no + existing workload management tables.""" # Veriy the initial table create on version 1.0.0 succeeded. - self.check_schema_version("1.0.0") + self.check_schema("1.0.0", vector) self.assert_log_contains("catalogd", "WARNING", r"Target schema version '1.0.0' is " r"not the latest schema version '\d+\.\d+\.\d+'") - self.restart_cluster("1.1.0", cluster_size=10) - self.check_schema_version("1.1.0") + self.restart_cluster(vector, schema_version="1.1.0", cluster_size=1, + log_symlinks=True) # Assert the upgrade process ran. self.assert_catalogd_all_tables(r"Workload management table '{}' is at version " r"'1.0.0' and will be upgraded") - self.wait_for_wm_init_complete() - - # Asserts the correct create table sql was used. - self.client = self.create_impala_client() - self.run_test_case('QueryTest/workload-management-log-v1.1.0', vector, self.WM_DB) - self.run_test_case('QueryTest/workload-management-live-v1.1.0', vector, self.WM_DB) + self.check_schema("1.1.0", vector) @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt", disable_log_buffering=True) - def test_log_table_newer_schema_version(self): + def test_log_table_newer_schema_version(self, vector): """Asserts a catalog startup flag version that is older than the workload management table schema version will write only the fields associated with the startup flag version.""" - self.restart_cluster("1.0.0", cluster_size=1, - additional_impalad_opts="--query_log_write_interval_s=15") + self.restart_cluster(vector, schema_version="1.0.0", cluster_size=1, + log_symlinks=True, additional_impalad_opts="--query_log_write_interval_s=15") self.assert_catalogd_log_contains("WARNING", "Target schema version '1.0.0' is not " "the latest schema version '1.1.0'") # The workload management tables will be on schema version 1.1.0. - self.check_schema_version("1.1.0") + self.check_schema("1.1.0", vector) # The workload management processing will be running on schema version 1.0.0. self.assert_catalogd_all_tables(r"Target schema version '1.0.0' of the '{}' table is " r"lower than the actual schema version") # Run a query and ensure it does not populate version 1.1.0 fields. - client = self.create_impala_client() - res = client.execute("select * from functional.alltypes") + res = self.client.execute("select * from functional.alltypes") assert res.success impalad = self.cluster.get_first_impalad() # Check the live queries table first. - assert_query(self.QUERY_TBL_LIVE, client, impalad=impalad, query_id=res.query_id, + assert_query(self.QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id, expected_overrides={ TQueryTableColumn.SELECT_COLUMNS: "", TQueryTableColumn.WHERE_COLUMNS: "", @@ -276,7 +235,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): # Check the query log table. impalad.service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) - assert_query(self.QUERY_TBL_LOG, client, impalad=impalad, query_id=res.query_id, + assert_query(self.QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id, expected_overrides={ TQueryTableColumn.SELECT_COLUMNS: "NULL", TQueryTableColumn.WHERE_COLUMNS: "NULL", @@ -285,6 +244,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): TQueryTableColumn.ORDERBY_COLUMNS: "NULL"}) @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, + log_symlinks=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " "--query_log_table_props=\"foo=bar,foo1=bar1\" " @@ -297,93 +257,95 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_table_prop(self.QUERY_TBL_LIVE, "foo", "bar") @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, + log_symlinks=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") - def test_create_from_scratch(self): + def test_create_from_scratch(self, vector): """Tests the conditions that exist when workload management is first started by deleteing the workload management tables and the sys db and restarting.""" - assert self.create_impala_client().execute("drop database {} cascade" + assert self.client.execute("drop database {} cascade" .format(self.WM_DB)).success - self.restart_cluster() - self.check_schema_version("1.1.0") + self.restart_cluster(vector, log_symlinks=True) + self.check_schema("1.1.0", vector) - def _run_invalid_table_prop_test(self, table, prop_name, expect_success=False): + def _run_invalid_table_prop_test(self, table, prop_name, vector, expect_success=False): """Runs a test where one of the workload management schema version table properties on a workload management table has been reset to an invalid value.""" try: - res = self.create_impala_client().execute( + res = self.client.execute( "alter table {} set tblproperties('{}'='')".format(table, prop_name)) assert res.success self.assert_catalogd_log_contains("INFO", "Finished execDdl request: ALTER_TABLE " "{}".format(table)) tmp_dir = self.get_tmp_dir('invalid_schema') - self.restart_cluster(wait_for_init_complete=False, cluster_size=1, - wait_for_backends=False, expect_startup_err=True, + self.restart_cluster(vector, wait_for_init_complete=False, cluster_size=1, + wait_for_backends=False, expect_startup_err=True, log_symlinks=True, additional_catalogd_opts="--minidump_path={}".format(tmp_dir), additional_impalad_opts="--minidump_path={}".format(tmp_dir)) if not expect_success: self.wait_for_log_exists("catalogd", "FATAL", 30) self.assert_catalogd_log_contains("FATAL", "could not parse version string '' " - "found on the '{}' property of table '{}'".format(prop_name, table)) + "found on the '{}' property of table '{}'".format(prop_name, table), + timeout_s=60) else: self.wait_for_wm_init_complete() assert len(os.listdir("{}/catalogd".format(tmp_dir))) == 0, \ "Found minidumps but none should exist." finally: - self.restart_cluster(cluster_size=1, + self.restart_cluster(vector, cluster_size=1, additional_catalogd_opts="--workload_mgmt_drop_tables=impala_query_log," "impala_query_live") - @CustomClusterTestSuite.with_args(cluster_size=1, + @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_schema_version_log_table_prop(self): + def test_invalid_schema_version_log_table_prop(self, vector): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_log table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "schema_version", True) + self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "schema_version", vector, True) - @CustomClusterTestSuite.with_args(cluster_size=1, + @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_wm_schema_version_log_table_prop(self): + def test_invalid_wm_schema_version_log_table_prop(self, vector): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_log table contains an invalid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "wm_schema_version") + self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "wm_schema_version", vector) - @CustomClusterTestSuite.with_args(cluster_size=1, + @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_schema_version_live_table_prop(self): + def test_invalid_schema_version_live_table_prop(self, vector): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_live table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "schema_version", True) + self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "schema_version", vector, True) - @CustomClusterTestSuite.with_args(cluster_size=1, + @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_wm_schema_version_live_table_prop(self): + def test_invalid_wm_schema_version_live_table_prop(self, vector): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_live table contains an invalid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "wm_schema_version") + self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "wm_schema_version", vector) @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt") - def test_upgrade_to_latest_from_previous_binary(self): + def test_upgrade_to_latest_from_previous_binary(self, vector): """Simulated an upgrade situation from workload management tables created by previous builds of Impala.""" @@ -396,29 +358,28 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): create_sql = f.read() assert self.client.execute(create_sql).success - self.restart_cluster(cluster_size=1, + self.restart_cluster(vector, cluster_size=1, log_symlinks=True, additional_impalad_opts="--query_log_write_interval_s=30") - self.check_schema_version("1.1.0") + self.check_schema("1.1.0", vector) # Run a query and ensure it does not populate version 1.1.0 fields. - client = self.create_impala_client() - res = client.execute("select * from functional.alltypes") + res = self.client.execute("select * from functional.alltypes") assert res.success impalad = self.cluster.get_first_impalad() # Check the live queries table first. - assert_query(self.QUERY_TBL_LIVE, client, impalad=impalad, query_id=res.query_id) + assert_query(self.QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id) # Check the query log table. impalad.service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) - assert_query(self.QUERY_TBL_LOG, client, impalad=impalad, query_id=res.query_id) + assert_query(self.QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id) @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt") - def test_start_at_1_0_0(self): + def test_start_at_1_0_0(self, vector): """Tests the situation where workload management tables were created by the original workload management code, and the current code is started at workload management schema version 1.0.0 (even though that version is not the latest).""" @@ -432,7 +393,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): create_sql = f.read() assert self.client.execute(create_sql).success - self.restart_cluster("1.0.0", + self.restart_cluster(vector, schema_version="1.0.0", log_symlinks=True, additional_impalad_opts="--query_log_write_interval_s=15") for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): @@ -440,12 +401,11 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_table_prop(table, "wm_schema_version", should_exist=False) # Run a query and ensure it does not populate version 1.1.0 fields. - client = self.create_impala_client() - res = client.execute("select * from functional.alltypes") + res = self.client.execute("select * from functional.alltypes") assert res.success # Check the live queries table first. - live_results = client.execute("select * from {} where query_id='{}'".format( + live_results = self.client.execute("select * from {} where query_id='{}'".format( self.QUERY_TBL_LIVE, res.query_id)) assert live_results.success assert len(live_results.data) == 1, "did not find query in '{}' table '{}'".format( @@ -457,7 +417,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): # Check the query log table. self.cluster.get_first_impalad().service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) - log_results = client.execute("select * from {} where query_id='{}'".format( + log_results = self.client.execute("select * from {} where query_id='{}'".format( self.QUERY_TBL_LOG, res.query_id)) assert log_results.success assert len(log_results.data) == 1, "did not find query in '{}' table '{}'".format( @@ -470,7 +430,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): catalogd_args="--enable_workload_mgmt", statestored_args="--use_subscriber_id_as_catalogd_priority=true", start_args="--enable_statestored_ha", - disable_log_buffering=True) + disable_log_buffering=True, log_symlinks=True) def test_statestore_ha(self): """Asserts workload management initialization completes successfully when statestore ha is enabled.""" @@ -489,7 +449,7 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): def setup_method(self, method): super(TestWorkloadManagementInitNoWait, self).setup_method(method) - @CustomClusterTestSuite.with_args(cluster_size=1, + @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --query_log_write_interval_s=3", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_drop_tables=impala_query_log,impala_query_live " @@ -518,7 +478,7 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): impalad.wait_for_metric_value("impala-server.completed-queries.written", 1, 15) @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True, - impalad_timeout_s=60, + impalad_timeout_s=60, log_symlinks=True, impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo " "--minidump_path={minidumps}", catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo " @@ -536,7 +496,7 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): r"version 'foo'") @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True, - impalad_timeout_s=60, + impalad_timeout_s=60, log_symlinks=True, impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=0.0.1 " "--minidump_path={minidumps}", catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=0.0.1 " @@ -555,7 +515,7 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): @CustomClusterTestSuite.with_args(start_args="--enable_catalogd_ha", statestored_args="--use_subscriber_id_as_catalogd_priority=true", - disable_log_buffering=True) + disable_log_buffering=True, log_symlinks=True) def test_catalog_ha_no_workload_mgmt(self): """Asserts workload management initialization is not done on either catalogd when workload management is not enabled.""" @@ -605,7 +565,7 @@ class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase): @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt", start_args="--enable_catalogd_ha", statestored_args="--use_subscriber_id_as_catalogd_priority=true", - disable_log_buffering=True) + disable_log_buffering=True, log_symlinks=True) def test_catalog_ha_failover(self): """Asserts workload management initialization is not run a second time when catalogd failover happens.""" @@ -630,7 +590,7 @@ class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase): catalogd_args="--enable_workload_mgmt", statestored_args="--use_subscriber_id_as_catalogd_priority=true", start_args="--enable_catalogd_ha --enable_statestored_ha", - disable_log_buffering=True) + disable_log_buffering=True, log_symlinks=True) def test_catalog_statestore_ha(self): """Asserts workload management initialization is only done on the active catalogd when both catalog and statestore ha is enabled.""" diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 33078d8e1..f5a450c59 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -218,7 +218,8 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, query_opts = re.search(r'\n\s+Query Options \(set by configuration\):\s+(.*?)\n', profile_text) assert query_opts is not None - assert value == query_opts.group(1), "query opts set by config incorrect" + assert value == query_opts.group(1).replace("'", "'"), \ + "query opts set by config incorrect" # Resource Pool value = column_val(TQueryTableColumn.RESOURCE_POOL) @@ -335,8 +336,12 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, # Executor Groups value = column_val(TQueryTableColumn.EXECUTOR_GROUPS) - exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+PlannerInfo', profile_text, - re.DOTALL) + # The following regular expression matches both named and unnamed executor groups in the + # query profile. For example, both of the following lines will match this regex: + # Executor group 1 (small): + # Executor group 1: + exec_groups = re.search(r'\n\s+(Executor group \d+(?:\s+\(\w+\))?:.*?)\n\s+PlannerInfo', + profile_text, re.DOTALL) if query_state_value == "EXCEPTION": assert exec_groups is None, "executor groups should not have been found" else:
