This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b3b2dbaca3054051b444b935a59b6bc5ac6eb53b Author: jasonmfehr <[email protected]> AuthorDate: Thu Feb 20 14:06:00 2025 -0800 IMPALA-13772: Fix Workload Management DMLs Timeouts The insert DMLs executed by workload management to add rows to the completed queries Iceberg table time out after 10 seconds because that is the default FETCH_ROWS_TIMEOUT_MS value. If the DML queues up in admission control, this timeout will quickly cause the DML to be cancelled. The fix is to set the FETCH_ROWS_TIMEOUT_MS query option to 0 for the workload management insert DMLs. Even though the workload management DMLs do not retrieve any rows, the FETCH_ROWS_TIMEOUT_MS value still applies because the internal server functions call into the client request state's ExecQueryOrDmlRequest() function which starts query execution and immediately returns. Then, the BlockOnWait function in impala-server.cc is called. This function times out based on the FETCH_ROWS_TIMEOUT_MS value. A new coordinator startup flag 'query_log_dml_exec_timeout_s' is added to specify the EXEC_TIME_LIMIT_S query option on the workload management insert DML statements. This flag ensures the DMLs will time out if they do not complete in a reasonable timeframe. While adding the new coordinator startup flag, a bug in the internal-server code was discovered. This bug caused a return status of 'ok' even when the query exec time limit was reached and the query cancelled. This bug has also been fixed. Testing: 1. Added new custom cluster test that simulates a busy cluster where the workload management DML queues for longer than 10 seconds. 2. Existing tests in test_query_log and test_admission_controller passed. 3. One internal-server-test ctest was modified to assert for a returned status of error when a query is cancelled. 4. Added a new cusom cluster test that asserts the workload management DML is cancelled based on the value of the new coordinator startup flag. Change-Id: I0cc7fbce40eadfb253d8cff5cbb83e2ad63a979f Reviewed-on: http://gerrit.cloudera.org:8080/22511 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/internal-server-test.cc | 2 +- be/src/service/internal-server.cc | 7 +- be/src/service/workload-management-worker.cc | 7 + be/src/workload_mgmt/workload-management-flags.cc | 43 ++-- fe/src/test/resources/fair-scheduler-one-query.xml | 14 ++ fe/src/test/resources/llama-site-one-query.xml | 24 +++ tests/common/cluster_config.py | 27 +++ tests/custom_cluster/test_admission_controller.py | 35 +--- tests/custom_cluster/test_query_log.py | 232 ++++++++++++++++----- tests/custom_cluster/test_workload_mgmt_init.py | 85 ++++---- .../test_workload_mgmt_sql_details.py | 14 +- tests/util/workload_management.py | 9 +- 12 files changed, 343 insertions(+), 156 deletions(-) diff --git a/be/src/service/internal-server-test.cc b/be/src/service/internal-server-test.cc index 7721a3093..866b4bf69 100644 --- a/be/src/service/internal-server-test.cc +++ b/be/src/service/internal-server-test.cc @@ -326,7 +326,7 @@ TEST(InternalServerTest, RetryFailedQuery) { orig_query_id = query_id; Status wait_status = fixture->WaitForResults(query_id); - ASSERT_OK(wait_status); + ASSERT_FALSE(wait_status.ok()); EXPECT_TRUE(orig_query_id != query_id); diff --git a/be/src/service/internal-server.cc b/be/src/service/internal-server.cc index 4460b6677..dca054af0 100644 --- a/be/src/service/internal-server.cc +++ b/be/src/service/internal-server.cc @@ -161,6 +161,11 @@ Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql, RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts)); RETURN_IF_ERROR(SubmitQuery(sql, new_session_id, new_query_id, persist_in_db)); + if (const auto& debug_action = query_opts.find(TImpalaQueryOptions::DEBUG_ACTION); + debug_action != query_opts.end()) { + RETURN_IF_ERROR(DebugAction(debug_action->second, "INTERNAL_SERVER_AFTER_SUBMIT")); + } + return WaitForResults(new_query_id); } // ImpalaServer::SubmitAndWait @@ -180,7 +185,7 @@ Status ImpalaServer::WaitForResults(TUniqueId& query_id) { return Status::Expected("query timed out waiting for results"); } - return Status::OK(); + return query_handle->query_status(); } // ImpalaServer::WaitForResults Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id, diff --git a/be/src/service/workload-management-worker.cc b/be/src/service/workload-management-worker.cc index 8d9dbdc18..62d4126b8 100644 --- a/be/src/service/workload-management-worker.cc +++ b/be/src/service/workload-management-worker.cc @@ -67,6 +67,7 @@ using kudu::Version; DECLARE_bool(enable_workload_mgmt); DECLARE_int32(query_log_write_interval_s); +DECLARE_int32(query_log_dml_exec_timeout_s); DECLARE_int32(query_log_max_insert_attempts); DECLARE_int32(query_log_max_queued); DECLARE_int32(query_log_shutdown_timeout_s); @@ -671,6 +672,12 @@ void ImpalaServer::WorkloadManagementWorker(const Version& target_schema_version if (!FLAGS_query_log_request_pool.empty()) { insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = FLAGS_query_log_request_pool; } + insert_query_opts[TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS] = "0"; + insert_query_opts[TImpalaQueryOptions::EXEC_TIME_LIMIT_S] = + std::to_string(FLAGS_query_log_dml_exec_timeout_s); + if (!FLAGS_debug_actions.empty()) { + insert_query_opts[TImpalaQueryOptions::DEBUG_ACTION] = FLAGS_debug_actions; + } while (true) { // Exit this thread if a shutdown was initiated. diff --git a/be/src/workload_mgmt/workload-management-flags.cc b/be/src/workload_mgmt/workload-management-flags.cc index c70493281..d582e0a41 100644 --- a/be/src/workload_mgmt/workload-management-flags.cc +++ b/be/src/workload_mgmt/workload-management-flags.cc @@ -25,6 +25,13 @@ using namespace std; static regex alphanum_underscore_dash("^[A-Za-z0-9\\-_]+$"); +// Validator function asserting the value of a flag is greater than or equal to 0. +static const auto gt_eq_0 = [](const char* name, int32_t val) { + if (val >= 0) return true; + LOG(ERROR) << "Invalid value for --" << name << ": must be greater than or equal to 0"; + return false; +}; + DEFINE_bool(enable_workload_mgmt, false, "Specifies if Impala will automatically write completed queries in the query log " "table. If this value is set to true and then later removed, the query log table " @@ -77,7 +84,13 @@ DEFINE_validator(query_log_write_interval_s, [](const char* name, int32_t val) { DEFINE_int32_hidden(query_log_write_timeout_s, 0, "Specifies the query timeout in " "seconds for inserts to the query log table. A value less than 1 indicates to use " - "the same value as the query_log_write_interval_s flag."); + "the same value as the query_log_write_interval_s flag. The value of this flag will " + "be set in QUERY_TIMEOUT_S query option of the query log table insert DMLs."); +DEFINE_validator(query_log_write_timeout_s, gt_eq_0); + +DEFINE_int32(query_log_dml_exec_timeout_s, 120, "Value of the EXEC_TIME_LIMIT_S " + "query option on the query log table insert dmls."); +DEFINE_validator(query_log_dml_exec_timeout_s, gt_eq_0); DEFINE_int32(query_log_max_queued, 5000, "Maximum number of records that can be queued " "before they are written to the impala query log table. This flag operates " @@ -86,12 +99,7 @@ DEFINE_int32(query_log_max_queued, 5000, "Maximum number of records that can be "matter how much time has passed since the last write. The countdown to the next " "write (based on the time period defined in the 'query_log_write_interval_s' flag) " "is not restarted."); - -DEFINE_validator(query_log_max_queued, [](const char* name, int32_t val) { - if (val >= 0) return true; - LOG(ERROR) << "Invalid value for --" << name << ": must be greater than or equal to 0"; - return false; -}); +DEFINE_validator(query_log_max_queued, gt_eq_0); DEFINE_string_hidden(workload_mgmt_maintenance_user, "impala", "Specifies the user that " "will be used to create and update the workload management database tables."); @@ -126,36 +134,21 @@ DEFINE_int32(query_log_max_sql_length, 16777216, "Maximum length of a sql statem "longer than this value is executed, the sql inserted into the completed queries " "table will be trimmed to this length. Any characters that need escaping will have " "their backslash character counted towards this limit."); - -DEFINE_validator(query_log_max_sql_length, [](const char* name, int32_t val) { - if (val >= 0) return true; - LOG(ERROR) << "Invalid value for --" << name << ": must be greater than or equal to 0"; - return false; -}); +DEFINE_validator(query_log_max_sql_length, gt_eq_0); DEFINE_int32(query_log_max_plan_length, 16777216, "Maximum length of the sql plan that " "will be recorded in the completed queries table. If a plan has a length longer than " "this value, the plan inserted into the completed queries table will be trimmed to " "this length. Any characters that need escaping will have their backslash character " "counted towards this limit."); - -DEFINE_validator(query_log_max_plan_length, [](const char* name, int32_t val) { - if (val >= 0) return true; - LOG(ERROR) << "Invalid value for --" << name << ": must be greater than or equal to 0"; - return false; -}); +DEFINE_validator(query_log_max_plan_length, gt_eq_0); DEFINE_int32_hidden(query_log_shutdown_timeout_s, 30, "Number of seconds to wait for " "the queue of completed queries to be drained to the query log table before timing " "out and continuing the shutdown process. The completed queries drain process runs " "after the shutdown process completes, thus the max shutdown time is extended by the " "value specified in this flag."); - -DEFINE_validator(query_log_shutdown_timeout_s, [](const char* name, int32_t val) { - if (val >= 0) return true; - LOG(ERROR) << "Invalid value for --" << name << ": must be a positive value"; - return false; -}); +DEFINE_validator(query_log_shutdown_timeout_s, gt_eq_0); DEFINE_string(cluster_id, "", "Specifies an identifier string that uniquely represents " "this cluster. This identifier is included in the query log table if enabled."); diff --git a/fe/src/test/resources/fair-scheduler-one-query.xml b/fe/src/test/resources/fair-scheduler-one-query.xml new file mode 100644 index 000000000..bfe42640b --- /dev/null +++ b/fe/src/test/resources/fair-scheduler-one-query.xml @@ -0,0 +1,14 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<allocations> + <queue name="root"> + <queue name="default"> + <maxResources>200000 mb, 0 vcores</maxResources> + <aclSubmitApps>* </aclSubmitApps> + </queue> + </queue> + <queuePlacementPolicy> + <rule name="specified" create="false"/> + <rule name="default" /> + </queuePlacementPolicy> +</allocations> diff --git a/fe/src/test/resources/llama-site-one-query.xml b/fe/src/test/resources/llama-site-one-query.xml new file mode 100644 index 000000000..f2d08d1c4 --- /dev/null +++ b/fe/src/test/resources/llama-site-one-query.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<configuration> + <property> + <name>llama.am.throttling.maximum.placed.reservations.root.default</name> + <value>1</value> + </property> + <property> + <name>llama.am.throttling.maximum.queued.reservations.root.default</name> + <value>5</value> + </property> + <property> + <name>impala.admission-control.pool-queue-timeout-ms.root.default</name> + <value>30000</value> + </property> + <property> + <name>impala.admission-control.max-query-mem-limit.root.default</name> + <value>1610612736</value><!--1.5GB--> + </property> + <property> + <name>impala.admission-control.min-query-mem-limit.root.default</name> + <value>52428800</value><!--50MB--> + </property> +</configuration> \ No newline at end of file diff --git a/tests/common/cluster_config.py b/tests/common/cluster_config.py index 305d74a8c..8fcfc5412 100644 --- a/tests/common/cluster_config.py +++ b/tests/common/cluster_config.py @@ -19,12 +19,18 @@ from __future__ import absolute_import, division, print_function +import os +import shutil + from tests.common.custom_cluster_test_suite import CustomClusterTestSuite # Same as in tests/authorization/test_ranger.py ADMIN = "admin" +# The path to resources directory which contains the admission control config files. +RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources") + enable_authorization = CustomClusterTestSuite.with_args( # Same as IMPALAD_ARGS and CATALOGD_ARGS in tests/authorization/test_ranger.py impalad_args="--server-name=server1 --ranger_service_type=hive " @@ -63,3 +69,24 @@ admit_no_query = CustomClusterTestSuite.with_args( single_coordinator = CustomClusterTestSuite.with_args( num_exclusive_coordinators=1 ) + + +def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file, + additional_args="", make_copy=False): + """Generates impalad startup flags configuring the fair scheduler and llama site path + options and setting logging for admission control to VLOG_ROW. + + The specified fair scheduler and llama site files are copied first, and the copies + are used as the value for the relevant startup flags.""" + fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file) + llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file) + if make_copy: + copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file) + copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file) + shutil.copy2(fs_allocation_path, copy_fs_allocation_path) + shutil.copy2(llama_site_path, copy_llama_site_path) + fs_allocation_path = copy_fs_allocation_path + llama_site_path = copy_llama_site_path + return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s " + "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, + additional_args)) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 3110a8ce6..bdaeb72a8 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -24,7 +24,6 @@ import logging import os import pytest import re -import shutil import signal import sys import threading @@ -34,7 +33,10 @@ from time import sleep, time from beeswaxd.BeeswaxService import QueryState from tests.beeswax.impala_beeswax import ImpalaBeeswaxException -from tests.common.cluster_config import impalad_admission_ctrl_flags +from tests.common.cluster_config import ( + impalad_admission_ctrl_flags, + impalad_admission_ctrl_config_args, + RESOURCES_DIR) from tests.common.custom_cluster_test_suite import ( ADMISSIOND_ARGS, IMPALAD_ARGS, @@ -49,6 +51,7 @@ from tests.common.test_dimensions import ( create_uncompressed_text_dimension) from tests.common.test_vector import ImpalaTestDimension from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session +from tests.util.workload_management import QUERY_TBL_LIVE from tests.util.web_pages_util import ( get_num_completed_backends, get_mem_admitted_backends_debug_page) @@ -139,32 +142,8 @@ STALE_TOPIC_THRESHOLD_MS = 500 INITIAL_QUEUE_REASON_REGEX = \ "Initial admission queue reason: waited [0-9]* ms, reason: .*" -# The path to resources directory which contains the admission control config files. -RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources") - # SQL statement that selects all records for the active queries table. -ACTIVE_SQL = "select * from sys.impala_query_live" - - -def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file, - additional_args="", make_copy=False): - """Generates impalad startup flags configuring the fair scheduler and llama site path - options and setting logging for admission control to VLOG_ROW. - - The specified fair scheduler and llama site files are copied first, and the copies - are used as the value for the relevant startup flags.""" - fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file) - llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file) - if make_copy: - copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file) - copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file) - shutil.copy2(fs_allocation_path, copy_fs_allocation_path) - shutil.copy2(llama_site_path, copy_llama_site_path) - fs_allocation_path = copy_fs_allocation_path - llama_site_path = copy_llama_site_path - return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s " - "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, - additional_args)) +ACTIVE_SQL = "select * from {}".format(QUERY_TBL_LIVE) def log_metrics(log_prefix, metrics): @@ -1860,7 +1839,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): vector=vector, expected_frag_counts=[4, 2, 2], query="select a.test_name, b.db_user from functional.jointbl a inner join " - "sys.impala_query_live b on a.test_name = b.db_name"), + "{} b on a.test_name = b.db_name".format(QUERY_TBL_LIVE)), @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=3, diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index 1da399077..31dcf26c8 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function import os import string +from time import sleep, time from getpass import getuser from ImpalaService import ImpalaHiveServer2Service @@ -28,20 +29,18 @@ from TCLIService import TCLIService from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport from thrift.protocol import TBinaryProtocol +from tests.common.cluster_config import impalad_admission_ctrl_config_args from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT from tests.common.test_vector import ImpalaTestDimension from tests.util.retry import retry -from tests.util.workload_management import assert_query -from time import sleep, time +from tests.util.workload_management import assert_query, WM_DB, QUERY_TBL_LOG class TestQueryLogTableBase(CustomClusterTestSuite): """Base class for all query log tests. Sets up the tests to use the Beeswax and HS2 client protocols.""" - WM_DB = "sys" - QUERY_TBL = "{0}.impala_query_log".format(WM_DB) PROTOCOL_BEESWAX = "beeswax" PROTOCOL_HS2 = "hs2" @@ -111,10 +110,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): "impala-server.completed-queries.written", 1, 60) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + self.QUERY_TBL) + client.execute("refresh " + QUERY_TBL_LOG) res = client.execute("select length(sql),plan from {0} where query_id='{1}'" - .format(self.QUERY_TBL, query_id)) + .format(QUERY_TBL_LOG, query_id)) assert res.success assert len(res.data) == 1 @@ -146,11 +145,11 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): "impala-server.completed-queries.written", 1, 60) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + self.QUERY_TBL) + client.execute("refresh " + QUERY_TBL_LOG) client.set_configuration_option("MAX_ROW_SIZE", 35000000) res = client.execute("select length(sql),plan from {0} where query_id='{1}'" - .format(self.QUERY_TBL, query_id)) + .format(QUERY_TBL_LOG, query_id)) assert res.success assert len(res.data) == 1 data = res.data[0].split("\t") @@ -182,10 +181,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): "impala-server.completed-queries.written", 1, 60) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + self.QUERY_TBL) + client.execute("refresh " + QUERY_TBL_LOG) actual = client.execute("select sql from {0} where query_id='{1}'".format( - self.QUERY_TBL, res.query_id)) + QUERY_TBL_LOG, res.query_id)) assert actual.success assert len(actual.data) == 1 assert actual.data[0] == select_sql @@ -225,7 +224,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): self.cluster.get_first_impalad().service.wait_for_metric_value( "impala-server.completed-queries.written", warming_query_count + 1, 60) - data = assert_query(self.QUERY_TBL, client, "test_query_hist_2", + data = assert_query(QUERY_TBL_LOG, client, "test_query_hist_2", res.runtime_profile) # Since the assert_query function only asserts that the bytes read from cache @@ -255,7 +254,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): impalad = self.cluster.get_first_impalad() client = self.get_client(vector.get_value('protocol')) - res = client.execute("drop table {0} purge".format(self.QUERY_TBL)) + res = client.execute("drop table {0} purge".format(QUERY_TBL_LOG)) assert res.success impalad.service.wait_for_metric_value( "impala-server.completed-queries.scheduled-writes", 3, 60) @@ -270,7 +269,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): with open(os.path.join(log_dir, "impalad.ERROR")) as file: for line in file: if line.find('could not write completed query table="{0}" query_id="{1}"' - .format(self.QUERY_TBL, res.query_id)) >= 0: + .format(QUERY_TBL_LOG, res.query_id)) >= 0: query_count += 1 assert query_count == 1 @@ -310,7 +309,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): test_sql = "select '{0}','{1}'".format(rand_str, self.FLUSH_MAX_RECORDS_CLUSTER_ID) test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format( - rand_str, self.QUERY_TBL, test_sql.replace("'", r"\'")) + rand_str, QUERY_TBL_LOG, test_sql.replace("'", r"\'")) for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT): res = client.execute(test_sql) @@ -332,12 +331,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + self.QUERY_TBL) + client.execute("refresh " + QUERY_TBL_LOG) # This query will remain queued due to the long write interval and max queued # records limit not being reached. res = client.execute(r"select count(*) from {0} where sql like 'select \'{1}\'%'" - .format(self.QUERY_TBL, rand_str)) + .format(QUERY_TBL_LOG, rand_str)) assert res.success assert 1 == len(res.data) assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0] @@ -376,7 +375,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "", + assert_query(QUERY_TBL_LOG, client2, "", res.runtime_profile) finally: client2.close() @@ -406,7 +405,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "", + assert_query(QUERY_TBL_LOG, client2, "", res.runtime_profile) finally: client2.close() @@ -416,8 +415,6 @@ class TestQueryLogOtherTable(TestQueryLogTableBase): """Tests to assert that query_log_table_name works with non-default value.""" OTHER_TBL = "completed_queries_table_{0}".format(int(time())) - # Used in TestQueryLogTableBase.setup_method - QUERY_TBL = "{0}.{1}".format(TestQueryLogTableBase.WM_DB, OTHER_TBL) @classmethod def add_test_dimensions(cls): @@ -442,10 +439,10 @@ class TestQueryLogOtherTable(TestQueryLogTableBase): client = self.get_client(vector.get_value('protocol')) try: - res = client.execute("show tables in {0}".format(self.WM_DB)) + res = client.execute("show tables in {0}".format(WM_DB)) assert res.success assert len(res.data) > 0, "could not find any tables in database {0}" \ - .format(self.WM_DB) + .format(WM_DB) tbl_found = False for tbl in res.data: @@ -453,9 +450,9 @@ class TestQueryLogOtherTable(TestQueryLogTableBase): tbl_found = True break assert tbl_found, "could not find table '{0}' in database '{1}'" \ - .format(self.OTHER_TBL, self.WM_DB) + .format(self.OTHER_TBL, WM_DB) finally: - client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL)) + client.execute("drop table {0}.{1} purge".format(WM_DB, self.OTHER_TBL)) class TestQueryLogTableHS2(TestQueryLogTableBase): @@ -534,7 +531,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): # Test the get_tables query. get_tables_req = TCLIService.TGetTablesReq() get_tables_req.sessionHandle = open_sess_resp.sessionHandle - get_tables_req.schemaName = self.WM_DB + get_tables_req.schemaName = WM_DB get_tables_resp = hs2_client.GetTables(get_tables_req) assert_resp(get_tables_resp) close_op(hs2_client, get_tables_resp) @@ -542,7 +539,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): # Test the get_table_types query. get_tbl_typ_req = TCLIService.TGetTableTypesReq() get_tbl_typ_req.sessionHandle = open_sess_resp.sessionHandle - get_tbl_typ_req.schemaName = self.WM_DB + get_tbl_typ_req.schemaName = WM_DB get_tbl_typ_resp = hs2_client.GetTableTypes(get_tbl_typ_req) assert_resp(get_tbl_typ_resp) close_op(hs2_client, get_tbl_typ_resp) @@ -591,11 +588,11 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): "impala-server.completed-queries.written", 1, 30) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh {}".format(self.QUERY_TBL)) + client.execute("refresh {}".format(QUERY_TBL_LOG)) # Assert only the one expected query was written to the completed queries table. assert_results = client.execute("select count(*) from {} where cluster_id='{}'" - .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID)) + .format(QUERY_TBL_LOG, self.HS2_OPERATIONS_CLUSTER_ID)) assert assert_results.success assert assert_results.data[0] == "1" @@ -624,7 +621,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "test_query_hist_mult", res.runtime_profile, + assert_query(QUERY_TBL_LOG, client2, "test_query_hist_mult", res.runtime_profile, max_mem_for_admission=10485760) finally: client2.close() @@ -659,7 +656,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) + assert_query(QUERY_TBL_LOG, client2, "test_query_hist_3", res.runtime_profile) finally: client2.close() @@ -723,7 +720,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): try: def assert_func(): results = client2.execute("select query_id,sql from {0} where query_id in " - "('{1}','{2}','{3}')".format(self.QUERY_TBL, + "('{1}','{2}','{3}')".format(QUERY_TBL_LOG, sql1.query_id, sql2.query_id, sql3.query_id)) return len(results.data) == 3 @@ -767,6 +764,37 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): r"Up to '3' queries may have been lost", timeout_s=60) + @CustomClusterTestSuite.with_args( + impalad_args="--enable_workload_mgmt " + "--query_log_write_interval_s=3 " + "--query_log_dml_exec_timeout_s=1 " + "--debug_actions=INTERNAL_SERVER_AFTER_SUBMIT:SLEEP@2000", + catalogd_args="--enable_workload_mgmt", + impalad_graceful_shutdown=True, cluster_size=1, disable_log_buffering=True) + def test_exec_timeout(self, vector): + """Asserts the --query_log_dml_exec_timeout_s startup flag is added to the workload + management insert DML and the DML will be cancelled when its execution time exceeds + the value of the startup flag. Also asserts the workload management code + detects the query was cancelled and handles the DML failure properly.""" + client = self.get_client(vector.get_value('protocol')) + + # Run a query to completion so the completed queries queue has 1 entry. + assert client.execute("select 1").success + + # Helper function that waits for the workload management insert DML to start. + def wait_for_insert_query(): + self.insert_query_id = _find_query_in_ui(self.cluster.get_first_impalad().service, + "in_flight_queries", _is_insert_query) + return self.insert_query_id + + assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1) + self.assert_impalad_log_contains("INFO", "Expiring query {} due to execution time " + "limit of 1s.".format(self.insert_query_id)) + self.assert_impalad_log_contains("INFO", "failed to write completed queries table=\"" + "{}\"".format(QUERY_TBL_LOG)) + self.assert_impalad_log_contains("INFO", "Query {} expired due to execution time " + "limit of 1s000ms".format(self.insert_query_id)) + class TestQueryLogTableAll(TestQueryLogTableBase): """Tests to assert the query log table is correctly populated when using all the @@ -795,7 +823,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "test_query_hist_2", res.runtime_profile) + assert_query(QUERY_TBL_LOG, client2, "test_query_hist_2", res.runtime_profile) finally: client2.close() @@ -829,7 +857,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) try: assert client2 is not None - assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) + assert_query(QUERY_TBL_LOG, client2, "test_query_hist_3", res.runtime_profile) finally: client2.close() @@ -858,12 +886,12 @@ class TestQueryLogTableAll(TestQueryLogTableBase): 60) result = client.execute("select query_id from {0} where sql='{1}'" - .format(self.QUERY_TBL, unix_now), + .format(QUERY_TBL_LOG, unix_now), fetch_profile_after_close=True) assert result.success assert len(result.data) == 1 - assert_query(query_tbl=self.QUERY_TBL, client=client, + assert_query(query_tbl=QUERY_TBL_LOG, client=client, expected_cluster_id="test_query_hist_2", impalad=impalad, query_id=result.data[0]) @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " @@ -891,7 +919,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): sqls["show tables"] = False sqls["SHOW tables"] = False sqls["ShoW tables"] = False - sqls["ShoW create table {0}".format(self.QUERY_TBL)] = False + sqls["ShoW create table {0}".format(QUERY_TBL_LOG)] = False sqls["show databases"] = False sqls["SHOW databases"] = False sqls["ShoW databases"] = False @@ -901,19 +929,19 @@ class TestQueryLogTableAll(TestQueryLogTableBase): sqls["--mycomment\nshow tables"] = False sqls["/*mycomment*/ show tables"] = False sqls["/*mycomment*/ show tables"] = False - sqls["/*mycomment*/ show create table {0}".format(self.QUERY_TBL)] = False - sqls["/*mycomment*/ show files in {0}".format(self.QUERY_TBL)] = False + sqls["/*mycomment*/ show create table {0}".format(QUERY_TBL_LOG)] = False + sqls["/*mycomment*/ show files in {0}".format(QUERY_TBL_LOG)] = False sqls["/*mycomment*/ show functions"] = False sqls["/*mycomment*/ show data sources"] = False sqls["/*mycomment*/ show views"] = False - sqls["show metadata tables in {0}".format(self.QUERY_TBL)] = False + sqls["show metadata tables in {0}".format(QUERY_TBL_LOG)] = False sqls["describe database default"] = False sqls["/*mycomment*/ describe database default"] = False - sqls["describe {0}".format(self.QUERY_TBL)] = False - sqls["/*mycomment*/ describe {0}".format(self.QUERY_TBL)] = False - sqls["describe history {0}".format(self.QUERY_TBL)] = False - sqls["/*mycomment*/ describe history {0}".format(self.QUERY_TBL)] = False + sqls["describe {0}".format(QUERY_TBL_LOG)] = False + sqls["/*mycomment*/ describe {0}".format(QUERY_TBL_LOG)] = False + sqls["describe history {0}".format(QUERY_TBL_LOG)] = False + sqls["/*mycomment*/ describe history {0}".format(QUERY_TBL_LOG)] = False sqls["select 1"] = True control_queries_count = 0 @@ -935,7 +963,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): sql_results = None for _ in range(6): sql_results = client.execute("select * from {0} where query_id='{1}'".format( - self.QUERY_TBL, results.query_id)) + QUERY_TBL_LOG, results.query_id)) control_queries_count += 1 if sql_results.success and len(sql_results.data) == 1: break @@ -949,7 +977,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): for sql, query_id in sqls.items(): log_results = client.execute("select * from {0} where query_id='{1}'" - .format(self.QUERY_TBL, query_id)) + .format(QUERY_TBL_LOG, query_id)) assert log_results.success assert len(log_results.data) == 0, "found query in query log table: {0}".format(sql) @@ -979,7 +1007,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): self.__run_sql_inject(impalad, client, sql1_str, "closing quotes", 4) # Try a sql inject attack with terminating quote and semicolon. - sql2_str = "select 1'); drop table {0}; select('".format(self.QUERY_TBL) + sql2_str = "select 1'); drop table {0}; select('".format(QUERY_TBL_LOG) self.__run_sql_inject(impalad, client, sql2_str, "terminating semicolon", 7) # Attempt to cause an error using multiline comments. @@ -1013,12 +1041,12 @@ class TestQueryLogTableAll(TestQueryLogTableBase): "impala-server.completed-queries.written", expected_writes, 60) # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + self.QUERY_TBL) + client.execute("refresh " + QUERY_TBL_LOG) if expect_success: sql_verify = client.execute( "select sql from {0} where query_id='{1}'" - .format(self.QUERY_TBL, sql_result.query_id)) + .format(QUERY_TBL_LOG, sql_result.query_id)) assert sql_verify.success, test_case assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \ @@ -1030,7 +1058,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): esc_sql = sql.replace("'", "\\'") sql_verify = client.execute("select sql from {0} where sql='{1}' " "and start_time_utc > '{2}'" - .format(self.QUERY_TBL, esc_sql, start_time)) + .format(QUERY_TBL_LOG, esc_sql, start_time)) assert sql_verify.success, test_case assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \ "table for test case '{1}" \ @@ -1085,7 +1113,7 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase): client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) try: assert client2 is not None - data = assert_query(self.QUERY_TBL, client2, "test_query_hist_1", + data = assert_query(QUERY_TBL_LOG, client2, "test_query_hist_1", res.runtime_profile, max_mem_for_admission=10485760) finally: client2.close() @@ -1098,3 +1126,107 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase): # data that was spilled. assert data["COMPRESSED_BYTES_SPILLED"] != "0", "compressed bytes spilled total " \ "was zero, test did not assert anything" + + +class TestQueryLogQueuedQueries(CustomClusterTestSuite): + """Simulates a cluster that is under load and has queries that are queueing in + admission control.""" + + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-one-query.xml", + llama_site_file="llama-site-one-query.xml", + additional_args="--query_log_write_interval_s=5"), + impalad_graceful_shutdown=True, workload_mgmt=True, + num_exclusive_coordinators=1, cluster_size=2, + default_query_options=[('fetch_rows_timeout_ms', '1000')]) + def test_query_queued(self): + """Tests the situation where a cluster is under heavy load and the workload management + insert DML is queued in admission control and has to wait for an admission control + slot to become available. + + The overall flow is: + 1. Run a query to completion so the 'insert into sys.impala_query_log' DML runs. + 2. Start a query that will block the only available admission control slot. + 3. Wait for the 'insert into sys.impala_query_log' DML to start. + 4. Wait 2 seconds to ensure the 1 second FETCH_ROWS_TIMEOUT_MS does not cause the + insert query to fail. + 5. Fetch all rows of the original blocking query to cause it to finish and + release its admission control slot. + 6. Assert the 'insert into sys.impala_query_log' DML is in the list of completed + queries. + """ + resp = self.hs2_client.execute_async("SELECT * FROM functional.alltypes LIMIT 5") + self.wait_for_state(resp, 'FINISHED_STATE', 60, client=self.hs2_client) + self.hs2_client.close_query(resp) + + # Start a query that will consume the only available slot since its rows are not + # fetched until later. + ROW_LIMIT = 5 + long_query_resp = self.hs2_client.execute_async( + "SELECT * FROM functional.alltypes LIMIT {}".format(ROW_LIMIT)) + + # Helper function that waits for the workload management insert DML to start. + def wait_for_insert_query(): + self.insert_query_id = _find_query_in_ui(self.cluster.get_first_impalad().service, + "in_flight_queries", _is_insert_query) + return self.insert_query_id + + assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1) + + # Wait 2 seconds to ensure the insert into DML is not killed by the fetch rows + # timeout (set to 1 second in this test's annotations). + sleep(2) + + # Helper function that checks if a query matches the workload management insert DML + # that had to wait for the one admission control to become available. + def is_insert_query_queryid(query): + return query["query_id"] == self.insert_query_id + + # Assert the workload management insert DML did not get cancelled early and is still + # waiting. + assert _find_query_in_ui(self.cluster.get_first_impalad().service, + "in_flight_queries", is_insert_query_queryid), \ + "Did not find the workload management insert into query having id '{}' in the " \ + "list of in-flight queries".format(self.insert_query_id) + + # Retrieve all rows of the original blocking query to cause it to complete. + self.wait_for_state(long_query_resp, 'FINISHED_STATE', 60, client=self.hs2_client) + self.hs2_client.close_query(long_query_resp) + + # Helper function that checks if a query matches the workload management insert DML + # that had to wait for the one admission control slot to become available. + def is_insert_query_queryid_success(query): + return query["query_id"] == self.insert_query_id and query['state'] == "FINISHED" + + # Ensure the insert into DML has finished successfully. The previous row retrieval + # ended the blocking query and thus should open up the one admission control slot for + # the workload management DML to run. + def find_completed_insert_query(): + return _find_query_in_ui(self.cluster.get_first_impalad().service, + "completed_queries", is_insert_query_queryid_success) + assert retry(func=find_completed_insert_query, max_attempts=10, sleep_time_s=1, + backoff=1) + + +# Helper function to determine if a query from the debug UI is a workload management +# insert DML. +def _is_insert_query(query): + return query["stmt"].lower().startswith("insert into {}".format(QUERY_TBL_LOG)) + + +def _find_query_in_ui(service, section, func): + """Calls to the debug UI's queries page and loops over all queries in the specified + section calling the provided func for each query, Returns the string id of the + first query that matches or None if no query matches.""" + assert section == "completed_queries" or section == "in_flight_queries" + + queries_json = service.get_debug_webpage_json('/queries') + query_id = None + + for query in queries_json[section]: + if func(query): + query_id = query['query_id'] + break + + return query_id diff --git a/tests/custom_cluster/test_workload_mgmt_init.py b/tests/custom_cluster/test_workload_mgmt_init.py index 4b6d64e04..76c16d7f2 100644 --- a/tests/custom_cluster/test_workload_mgmt_init.py +++ b/tests/custom_cluster/test_workload_mgmt_init.py @@ -25,21 +25,22 @@ from logging import getLogger from SystemTables.ttypes import TQueryTableColumn from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.util.workload_management import assert_query +from tests.util.workload_management import ( + assert_query, + WM_DB, + QUERY_TBL_LOG_NAME, + QUERY_TBL_LOG, + QUERY_TBL_LIVE_NAME, + QUERY_TBL_LIVE) LOG = getLogger(__name__) +QUERY_TBL_ALL = "{},{}".format(QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME) class TestWorkloadManagementInitBase(CustomClusterTestSuite): """Defines common setup and methods for all workload management init tests.""" - WM_DB = "sys" - QUERY_TBL_LOG_NAME = "impala_query_log" - QUERY_TBL_LOG = "{0}.{1}".format(WM_DB, QUERY_TBL_LOG_NAME) - QUERY_TBL_LIVE_NAME = "impala_query_live" - QUERY_TBL_LIVE = "{0}.{1}".format(WM_DB, QUERY_TBL_LIVE_NAME) - LATEST_SCHEMA = "1.2.0" @classmethod @@ -121,15 +122,15 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): """Asserts a given regex is found in the catalog log file for each workload management table. The regex is passed the fully qualified table name using python string substitution.""" - for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): + for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): self.assert_catalogd_log_contains("INFO", line_regex.format(table)) def check_schema(self, schema_ver, vector, multiple_impalad=False): """Asserts that all workload management tables have the correct columns and are at the specified schema version.""" - for tbl_name in (self.QUERY_TBL_LOG_NAME, self.QUERY_TBL_LIVE_NAME): + for tbl_name in (QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME): self.run_test_case('QueryTest/workload-mgmt-{}-v{}'.format(tbl_name, schema_ver), - vector, self.WM_DB, multiple_impalad=multiple_impalad) + vector, WM_DB, multiple_impalad=multiple_impalad) class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): @@ -160,7 +161,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.0.0 " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_0_0(self, vector): """Asserts that workload management tables are properly created on version 1.0.0 using a 10 node cluster when no tables exist.""" @@ -171,7 +172,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.1.0 " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_1_0(self, vector): """Asserts that workload management tables are properly created on version 1.1.0 using a 10 node cluster when no tables exist.""" @@ -181,7 +182,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): log_symlinks=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_2_0(self, vector): """Asserts that workload management tables are properly created on the latest version using a 10 node cluster when no tables exist.""" @@ -191,7 +192,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.0.0 " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live", + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_0_0_to_1_1_0(self, vector): """Asserts that an upgrade from version 1.0.0 to 1.1.0 succeeds when starting with no @@ -215,7 +216,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.1.0 " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live", + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_1_0_to_1_2_0(self, vector): """Asserts that an upgrade from version 1.1.0 to 1.2.0 succeeds when starting with no @@ -239,7 +240,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", catalogd_args="--enable_workload_mgmt " "--workload_mgmt_schema_version=1.0.0 " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live", + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_0_0_to_1_2_0(self, vector): """Asserts that an upgrade from version 1.0.0 to 1.2.0 succeeds when starting with no @@ -285,7 +286,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad = self.cluster.get_first_impalad() # Check the live queries table first. - assert_query(self.QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id, + assert_query(QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id, expected_overrides={ TQueryTableColumn.SELECT_COLUMNS: "", TQueryTableColumn.WHERE_COLUMNS: "", @@ -298,7 +299,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): # Check the query log table. impalad.service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) - assert_query(self.QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id, + assert_query(QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id, expected_overrides={ TQueryTableColumn.SELECT_COLUMNS: "NULL", TQueryTableColumn.WHERE_COLUMNS: "NULL", @@ -313,24 +314,24 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " "--query_log_table_props=\"foo=bar,foo1=bar1\" " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_table_with_custom_props(self): """Asserts that creating workload management tables with additional properties specified adds those properties.""" - self.assert_table_prop(self.QUERY_TBL_LOG, "foo", "bar") - self.assert_table_prop(self.QUERY_TBL_LIVE, "foo", "bar") + self.assert_table_prop(QUERY_TBL_LOG, "foo", "bar") + self.assert_table_prop(QUERY_TBL_LIVE, "foo", "bar") @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, log_symlinks=True, impalad_args="--enable_workload_mgmt", catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live") + "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_from_scratch(self, vector): """Tests the conditions that exist when workload management is first started by deleteing the workload management tables and the sys db and restarting.""" assert self.client.execute("drop database {} cascade" - .format(self.WM_DB)).success + .format(WM_DB)).success self.restart_cluster(vector, log_symlinks=True) self.check_schema(self.LATEST_SCHEMA, vector) @@ -362,8 +363,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): "Found minidumps but none should exist." finally: self.restart_cluster(vector, cluster_size=1, - additional_catalogd_opts="--workload_mgmt_drop_tables=impala_query_log," - "impala_query_live") + additional_catalogd_opts="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", @@ -374,7 +374,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_log table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "schema_version", vector, True) + self._run_invalid_table_prop_test(QUERY_TBL_LOG, "schema_version", vector, True) @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", @@ -384,7 +384,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): def test_invalid_wm_schema_version_log_table_prop(self, vector): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_log table contains an invalid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LOG, "wm_schema_version", vector) + self._run_invalid_table_prop_test(QUERY_TBL_LOG, "wm_schema_version", vector) @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", @@ -395,7 +395,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_live table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "schema_version", vector, True) + self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "schema_version", vector, True) @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", @@ -405,7 +405,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): def test_invalid_wm_schema_version_live_table_prop(self, vector): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_live table contains an invalid value.""" - self._run_invalid_table_prop_test(self.QUERY_TBL_LIVE, "wm_schema_version", vector) + self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "wm_schema_version", vector) @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, impalad_args="--enable_workload_mgmt", @@ -414,10 +414,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): """Simulated an upgrade situation from workload management tables created by previous builds of Impala.""" - for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): + for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): assert self.client.execute("drop table if exists {} purge".format(table)).success - for table in (self.QUERY_TBL_LOG_NAME, self.QUERY_TBL_LIVE_NAME): + for table in (QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME): with open("{}/testdata/workload_mgmt/create_{}_table.sql" .format(os.environ["IMPALA_HOME"], table), "r") as f: create_sql = f.read() @@ -434,12 +434,12 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): impalad = self.cluster.get_first_impalad() # Check the live queries table first. - assert_query(self.QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id) + assert_query(QUERY_TBL_LIVE, self.client, impalad=impalad, query_id=res.query_id) # Check the query log table. impalad.service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) - assert_query(self.QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id) + assert_query(QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id) @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, impalad_args="--enable_workload_mgmt", @@ -449,10 +449,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): workload management code, and the current code is started at workload management schema version 1.0.0 (even though that version is not the latest).""" - for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): + for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): assert self.client.execute("drop table if exists {} purge".format(table)).success - for table in (self.QUERY_TBL_LOG_NAME, self.QUERY_TBL_LIVE_NAME): + for table in (QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME): with open("{}/testdata/workload_mgmt/create_{}_table.sql" .format(os.environ["IMPALA_HOME"], table), "r") as f: create_sql = f.read() @@ -461,7 +461,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.restart_cluster(vector, schema_version="1.0.0", log_symlinks=True, additional_impalad_opts="--query_log_write_interval_s=15") - for table in (self.QUERY_TBL_LOG, self.QUERY_TBL_LIVE): + for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): self.assert_table_prop(table, "schema_version", "1.0.0") self.assert_table_prop(table, "wm_schema_version", should_exist=False) @@ -471,10 +471,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): # Check the live queries table first. live_results = self.client.execute("select * from {} where query_id='{}'".format( - self.QUERY_TBL_LIVE, res.query_id)) + QUERY_TBL_LIVE, res.query_id)) assert live_results.success assert len(live_results.data) == 1, "did not find query in '{}' table '{}'".format( - res.query_id, self.QUERY_TBL_LIVE) + res.query_id, QUERY_TBL_LIVE) assert len(live_results.column_labels) == 49 data = live_results.data[0].split("\t") assert len(data) == len(live_results.column_labels) @@ -483,10 +483,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.cluster.get_first_impalad().service.wait_for_metric_value( "impala-server.completed-queries.written", 2, 60) log_results = self.client.execute("select * from {} where query_id='{}'".format( - self.QUERY_TBL_LOG, res.query_id)) + QUERY_TBL_LOG, res.query_id)) assert log_results.success assert len(log_results.data) == 1, "did not find query in '{}' table '{}'".format( - res.query_id, self.QUERY_TBL_LOG) + res.query_id, QUERY_TBL_LOG) assert len(log_results.column_labels) == 49 data = log_results.data[0].split("\t") assert len(data) == len(log_results.column_labels) @@ -517,8 +517,9 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, impalad_args="--enable_workload_mgmt --query_log_write_interval_s=3", catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables=impala_query_log,impala_query_live " - "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000", + "--workload_mgmt_drop_tables={} " + "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000" + .format(QUERY_TBL_ALL), disable_log_buffering=True) def test_catalog_init_delay(self): # Workload management init is slightly delayed after catalogd startup, wait for the diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py b/tests/custom_cluster/test_workload_mgmt_sql_details.py index 89ee0bd5c..cddb18fd5 100644 --- a/tests/custom_cluster/test_workload_mgmt_sql_details.py +++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py @@ -22,7 +22,7 @@ import os from SystemTables.ttypes import TQueryTableColumn from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from test_query_log import TestQueryLogTableBase -from tests.util.workload_management import assert_csv_col +from tests.util.workload_management import assert_csv_col, QUERY_TBL_LOG class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): @@ -63,27 +63,27 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): "impala-server.completed-queries.written", 1, 60) # Assert tables queried. - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.TABLES_QUERIED, res.query_id, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.TABLES_QUERIED, res.query_id, expected_tables_queried, db) # Assert select columns. - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.SELECT_COLUMNS, res.query_id, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.SELECT_COLUMNS, res.query_id, expected_select_cols, db) # Assert where columns. - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.WHERE_COLUMNS, res.query_id, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.WHERE_COLUMNS, res.query_id, expected_where_cols, db) # Assert join columns. - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.JOIN_COLUMNS, res.query_id, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.JOIN_COLUMNS, res.query_id, expected_join_cols, db) # Aggregate Columns - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.AGGREGATE_COLUMNS, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.AGGREGATE_COLUMNS, res.query_id, expected_aggregate_cols, db) # OrderBy Columns - assert_csv_col(client, self.QUERY_TBL, TQueryTableColumn.ORDERBY_COLUMNS, + assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.ORDERBY_COLUMNS, res.query_id, expected_orderby_cols, db) def _run_test(self, vector, tpcds_query_num, expected_tables_queried, expected_select, diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 1e3cae8d9..f2204899d 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -27,6 +27,11 @@ from tests.util.assert_time import assert_time_str, convert_to_milliseconds from tests.util.memory import assert_byte_str, convert_to_bytes DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600 +WM_DB = "sys" +QUERY_TBL_LOG_NAME = "impala_query_log" +QUERY_TBL_LOG = "{0}.{1}".format(WM_DB, QUERY_TBL_LOG_NAME) +QUERY_TBL_LIVE_NAME = "impala_query_live" +QUERY_TBL_LIVE = "{0}.{1}".format(WM_DB, QUERY_TBL_LIVE_NAME) def round_to_3(val): @@ -62,7 +67,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None, profile_lines = profile_text.split("\n") # Force Impala to process the inserts to the completed queries table. - if query_tbl != 'sys.impala_query_live': + if query_tbl != QUERY_TBL_LIVE: client.execute("refresh " + query_tbl) # Assert the query was written correctly to the query log table. @@ -671,7 +676,7 @@ def assert_csv_col(client, query_tbl, col, query_id, expected_list, db="tpcds"): print("Query Id: {0}".format(query_id)) # Force Impala to process the inserts to the completed queries table. - if query_tbl != 'sys.impala_query_live': + if query_tbl != QUERY_TBL_LIVE: client.execute("refresh " + query_tbl) # Assert the query was written correctly to the query log table.
