This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f28a32fbc351919aec6d8b0d357f62c9b2521a40 Author: Riza Suminto <[email protected]> AuthorDate: Sat Mar 29 06:23:25 2025 -0700 IMPALA-13916: Change BaseTestSuite.default_test_protocol to HS2 This is the final patch to move all Impala e2e and custom cluster tests to use HS2 protocol by default. Only beeswax-specific test remains testing against beeswax protocol by default. We can remove them once Impala officially remove beeswax support. HS2 error message formatting in impala-hs2-server.cc is adjusted a bit to match with formatting in impala-beeswax-server.cc. Move TestWebPageAndCloseSession from webserver/test_web_pages.py to custom_cluster/test_web_pages.py to disable glog log buffering. Testing: - Pass exhaustive tests, except for some known and unrelated flaky tests. Change-Id: I42e9ceccbba1e6853f37e68f106265d163ccae28 Reviewed-on: http://gerrit.cloudera.org:8080/22845 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Jason Fehr <[email protected]> --- be/src/service/impala-hs2-server.cc | 4 +- tests/common/base_test_suite.py | 5 +- tests/common/impala_service.py | 19 +++--- tests/common/impala_test_suite.py | 3 +- tests/conftest.py | 2 +- tests/custom_cluster/test_coordinators.py | 2 +- tests/custom_cluster/test_data_cache.py | 31 +++++----- tests/custom_cluster/test_executor_groups.py | 6 +- .../test_frontend_connection_limit.py | 2 +- tests/custom_cluster/test_hs2_fault_injection.py | 2 +- tests/custom_cluster/test_query_retries.py | 6 ++ tests/custom_cluster/test_web_pages.py | 40 +++++++++++-- tests/failure/test_failpoints.py | 43 ++++++------- tests/metadata/test_ddl.py | 2 +- tests/metadata/test_event_processing.py | 11 ++-- tests/metadata/test_explain.py | 70 ++++++++++------------ tests/metadata/test_recursive_listing.py | 19 +++--- tests/performance/query_exec_functions.py | 29 +++++++-- tests/query_test/test_beeswax.py | 9 +-- tests/query_test/test_errorlog.py | 11 ++-- tests/query_test/test_insert.py | 6 +- tests/query_test/test_mem_usage_scaling.py | 6 +- tests/query_test/test_result_spooling.py | 9 +-- tests/query_test/test_runtime_filters.py | 10 ++-- tests/query_test/test_tablesample.py | 8 ++- tests/webserver/test_web_pages.py | 63 +++++++------------ 26 files changed, 230 insertions(+), 188 deletions(-) diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index ba35ca395..c18e83a86 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -1169,7 +1169,9 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { } // Report analysis errors - ss << join(query_handle->GetAnalysisWarnings(), "\n"); + for (const string& warning : query_handle->GetAnalysisWarnings()) { + ss << warning << "\n"; + } // Report queuing reason if the admission controller queued the query. const string* admission_result = query_handle->summary_profile()->GetInfoString( AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT); diff --git a/tests/common/base_test_suite.py b/tests/common/base_test_suite.py index 84dc26241..5612dbb02 100644 --- a/tests/common/base_test_suite.py +++ b/tests/common/base_test_suite.py @@ -19,10 +19,11 @@ from __future__ import absolute_import, division, print_function import logging -from tests.common.test_vector import BEESWAX, ImpalaTestMatrix +from tests.common.test_vector import HS2, ImpalaTestMatrix LOG = logging.getLogger('base_test_suite') + # Base class for tests. class BaseTestSuite(object): ImpalaTestMatrix = ImpalaTestMatrix() @@ -40,4 +41,4 @@ class BaseTestSuite(object): @classmethod def default_test_protocol(cls): """See documentation in ImpalaTestSuite.default_test_protocol().""" - return BEESWAX + return HS2 diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index cff62f5b1..ae9707ff8 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -272,7 +272,8 @@ class ImpaladService(BaseImpalaService): def get_num_known_live_executors(self, timeout=30, interval=1, include_shutting_down=True): - return self.get_num_known_live_backends(timeout=timeout, interval=interval, + return self.get_num_known_live_backends(timeout=timeout, + interval=interval, include_shutting_down=include_shutting_down, only_executors=True) @@ -305,7 +306,8 @@ class ImpaladService(BaseImpalaService): def get_queries_json(self, timeout=30, interval=1): """Return the full JSON from the /queries page.""" - return self.get_debug_webpage_json('queries', timeout=timeout, interval=interval) + return json.loads( + self.read_debug_webpage('queries?json', timeout=timeout, interval=interval)) def get_query_locations(self): # Returns a dictionary of the format <host_address, num_of_queries_running_there> @@ -396,9 +398,9 @@ class ImpaladService(BaseImpalaService): return self.read_debug_webpage( "query_profile?query_id=%s&raw" % (query_id), timeout=timeout, interval=interval) - def get_query_status(self, query_id): + def get_query_status(self, query_id, timeout=10, interval=1): """Gets the 'Query Status' section of the query's runtime profile.""" - page = self.read_query_profile_page(query_id) + page = self.read_query_profile_page(query_id, timeout=timeout, interval=interval) status_line =\ next((x for x in page.split('\n') if re.search('Query Status:', x)), None) return status_line.split('Query Status:')[1].strip() @@ -430,10 +432,11 @@ class ImpaladService(BaseImpalaService): query_status = "" while (time() - start_time < timeout): try: - query_status = self.get_query_status(query_id) + query_status = self.get_query_status( + query_id, timeout=timeout, interval=interval) if query_status is None: - assert False, "Could not find 'Query Status' section in profile of "\ - "query with id %s:\n%s" % (query_id) + raise Exception("Could not find 'Query Status' section in profile of " + "query with id {}".format(query_id)) except Exception: pass if expected_content in query_status: @@ -455,6 +458,7 @@ class ImpaladService(BaseImpalaService): def create_beeswax_client(self, use_kerberos=False): """Creates a new beeswax client connection to the impalad. DEPRECATED: Use create_hs2_client() instead.""" + LOG.warning('beeswax protocol is deprecated.') client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), use_kerberos, BEESWAX) client.connect() @@ -514,6 +518,7 @@ class ImpaladService(BaseImpalaService): if protocol == HS2_HTTP: port = self.hs2_http_port if protocol == BEESWAX: + LOG.warning('beeswax protocol is deprecated.') port = self.beeswax_port client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol) client.connect() diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 996822483..0ae933bbd 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -901,7 +901,8 @@ class ImpalaTestSuite(BaseTestSuite): assert result.success, "Query failed: {0}".format(result.data) # Decode the results read back if the data is stored with a specific encoding. - if encoding: result.data = [row.decode(encoding) for row in result.data] + if encoding and result.data: + result.data = [row.decode(encoding) for row in result.data] # Replace $NAMENODE in the expected results with the actual namenode URI. if 'RESULTS' in test_section: # Combining 'RESULTS' with 'DML_RESULTS" is currently unsupported because diff --git a/tests/conftest.py b/tests/conftest.py index edf6a4a2c..9c3154488 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -56,7 +56,7 @@ DEFAULT_KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1') DEFAULT_KUDU_MASTER_PORT = os.getenv('KUDU_MASTER_PORT', '7051') DEFAULT_METASTORE_SERVER = 'localhost:9083' DEFAULT_NAMENODE_ADDR = None -DEFAULT_TEST_PROTOCOL = os.getenv('DEFAULT_TEST_PROTOCOL', BEESWAX) +DEFAULT_TEST_PROTOCOL = os.getenv('DEFAULT_TEST_PROTOCOL', HS2) if FILESYSTEM == 'isilon': DEFAULT_NAMENODE_ADDR = "{node}:{port}".format(node=os.getenv("ISILON_NAMENODE"), port=ISILON_WEBHDFS_PORT) diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py index e95cc0c0e..84cc9cbd7 100644 --- a/tests/custom_cluster/test_coordinators.py +++ b/tests/custom_cluster/test_coordinators.py @@ -50,7 +50,7 @@ class TestCoordinators(CustomClusterTestSuite): # Verify that Beeswax and HS2 client connections can't be established at a worker node beeswax_client = None try: - beeswax_client = worker.service.create_hs2_client() + beeswax_client = worker.service.create_beeswax_client() except Exception as e: LOG.info("Caught exception {0}".format(e)) finally: diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py index 36bb03576..73a868ae8 100644 --- a/tests/custom_cluster/test_data_cache.py +++ b/tests/custom_cluster/test_data_cache.py @@ -23,6 +23,7 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster +from tests.common.test_vector import HS2 @SkipIf.is_buggy_el6_kernel @@ -216,7 +217,8 @@ class TestDataCache(CustomClusterTestSuite): QUERY = "select * from tpch_parquet.lineitem" # Execute a query, record the total bytes and the number of entries of cache before # cache dump. - self.execute_query(QUERY) + with self.create_impala_client(protocol=HS2) as client1: + client1.execute(QUERY) assert self.get_data_cache_metric('hit-bytes') == 0 assert self.get_data_cache_metric('hit-count') == 0 total_bytes = self.get_data_cache_metric('total-bytes') @@ -242,8 +244,8 @@ class TestDataCache(CustomClusterTestSuite): assert self.get_data_cache_metric('num-entries') == num_entries # Reconnect to the service and execute the query, expecting some cache hits. - self.client.connect() - self.execute_query(QUERY) + with self.create_impala_client(protocol=HS2) as client2: + client2.execute(QUERY) assert self.get_data_cache_metric('hit-bytes') > 0 assert self.get_data_cache_metric('hit-count') > 0 if test_reduce_size: @@ -285,21 +287,22 @@ class TestDataCache(CustomClusterTestSuite): QUERY = "select * from tpch_parquet.lineitem" # Execute the query asynchronously, wait a short while, and do gracefully shutdown # immediately to test the race between cache writes and setting cache read-only. - handle = self.execute_query_async(QUERY) - sleep(1) - impalad = self.cluster.impalads[0] - impalad.kill(SIGRTMIN) - self.client.fetch(QUERY, handle) - self.client.close_query(handle) - impalad.wait_for_exit() - impalad.start() - impalad.service.wait_for_num_known_live_backends(1) + with self.create_impala_client(protocol=HS2) as client1: + handle = client1.execute_async(QUERY) + sleep(1) + impalad = self.cluster.impalads[0] + impalad.kill(SIGRTMIN) + client1.fetch(QUERY, handle) + client1.close_query(handle) + impalad.wait_for_exit() + impalad.start() + impalad.service.wait_for_num_known_live_backends(1) # We hope that in this case, the cache is still properly dumped and loaded, # and then the same query is executed to expect some cache hits. self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.') - self.client.connect() - self.execute_query(QUERY) + with self.create_impala_client(protocol=HS2) as client2: + client2.execute(QUERY) assert self.get_data_cache_metric('hit-bytes') > 0 assert self.get_data_cache_metric('hit-count') > 0 diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 48df14ad4..a13f56782 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -678,11 +678,11 @@ class TestExecutorGroups(CustomClusterTestSuite): # Predicates to assert that a certain join type was picked. def assert_broadcast_join(): ret = self.execute_query_expect_success(self.client, QUERY) - assert ":EXCHANGE [BROADCAST]" in str(ret) + assert ":EXCHANGE [BROADCAST]" in str(ret.data) def assert_hash_join(): ret = self.execute_query_expect_success(self.client, QUERY) - assert ":EXCHANGE [HASH(b.id)]" in str(ret) + assert ":EXCHANGE [HASH(b.id)]" in str(ret.data) # Without any executors we default to a hash join. assert_hash_join() @@ -721,7 +721,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # Predicate to assert that the planner decided on a hash join. def assert_hash_join(): ret = self.execute_query_expect_success(self.client, QUERY) - assert ":EXCHANGE [HASH(b.id)]" in str(ret) + assert ":EXCHANGE [HASH(b.id)]" in str(ret.data) # Without any executors we default to a hash join. assert_hash_join() diff --git a/tests/custom_cluster/test_frontend_connection_limit.py b/tests/custom_cluster/test_frontend_connection_limit.py index 845bd80f1..5a5e0ca60 100644 --- a/tests/custom_cluster/test_frontend_connection_limit.py +++ b/tests/custom_cluster/test_frontend_connection_limit.py @@ -44,7 +44,7 @@ class TestFrontendConnectionLimit(CustomClusterTestSuite): def _connect_and_query(self, query, impalad): with impalad.service.create_hs2_client() as client: - client.execute(query) + self.execute_query_expect_success(client, query) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( diff --git a/tests/custom_cluster/test_hs2_fault_injection.py b/tests/custom_cluster/test_hs2_fault_injection.py index 1b4ca8632..0cfc124ce 100644 --- a/tests/custom_cluster/test_hs2_fault_injection.py +++ b/tests/custom_cluster/test_hs2_fault_injection.py @@ -410,7 +410,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite): self.custom_hs2_http_client.wait_to_finish(query_handle) self.transport.enable_fault(502, "Injected Fault", 0.50) warning_log = self.custom_hs2_http_client.get_warning_log(query_handle) - assert warning_log == 'WARNINGS: JOIN hint not recognized: foo' + assert 'WARNINGS: JOIN hint not recognized: foo' in warning_log self.close_query(query_handle) output = capsys.readouterr()[1].splitlines() assert output[1][TS_LEN:] == self.__expect_msg_retry("GetLog") diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index c94bc53b5..4cda46275 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -46,6 +46,7 @@ from tests.common.skip import ( SkipIfNotHdfsMinicluster, ) from tests.common.test_dimensions import add_mandatory_exec_option +from tests.common.test_vector import BEESWAX # The BE krpc port of the impalad to simulate rpc or disk errors in tests. FAILED_KRPC_PORT = 27001 @@ -69,6 +70,11 @@ def _get_disk_fail_action(port): @SkipIfEC.parquet_file_size class TestQueryRetries(CustomClusterTestSuite): + @classmethod + def default_test_protocol(cls): + # Retry mechanism is slightly different between beeswax vs hs2 protocol. + return BEESWAX + # A query that shuffles a lot of data. Useful when testing query retries since it # ensures that a query fails during a TransmitData RPC. The RPC failure will cause the # target impalad to be blacklisted and the query to be retried. The query also has to diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py index 76c3cca6c..b6f77e115 100644 --- a/tests/custom_cluster/test_web_pages.py +++ b/tests/custom_cluster/test_web_pages.py @@ -28,7 +28,7 @@ from tests.common.custom_cluster_test_suite import ( DEFAULT_CLUSTER_SIZE, CustomClusterTestSuite) from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION -from tests.common.skip import SkipIfFS +from tests.common.skip import SkipIfFS, SkipIfDockerizedCluster from tests.shell.util import run_impala_shell_cmd SMALL_QUERY_LOG_SIZE_IN_BYTES = 40 * 1024 @@ -192,12 +192,9 @@ class TestWebPage(CustomClusterTestSuite): shell_messages = ["Query submitted at: ", "(Coordinator: ", "Query state can be monitored at: "] query_shell_arg = '--query=select * from functional.alltypes' - # hs2 + # protocol is set inside vector results = run_impala_shell_cmd(vector, [query_shell_arg]) self._validate_shell_messages(results.stderr, shell_messages, should_exist=False) - # beeswax - results = run_impala_shell_cmd(vector, ['--protocol=beeswax', query_shell_arg]) - self._validate_shell_messages(results.stderr, shell_messages, should_exist=False) # Even though webserver url is not exposed, it is still accessible. page = requests.get('http://localhost:25000') assert page.status_code == requests.codes.ok @@ -542,3 +539,36 @@ class TestWebPage(CustomClusterTestSuite): self._test_catalog_tables_stats_after_describe("functional.alltypes", 24) self._test_catalog_tables_stats_after_describe( "functional_parquet.iceberg_lineitem_sixblocks", 4) + + +class TestWebPageAndCloseSession(CustomClusterTestSuite): + ROOT_URL = "http://localhost:{0}/" + + @SkipIfDockerizedCluster.daemon_logs_not_exposed + @CustomClusterTestSuite.with_args(disable_log_buffering=True) + def test_display_src_socket_in_query_cause(self): + # Execute a long running query then cancel it from the WebUI. + # Check the runtime profile and the INFO logs for the cause message. + query = "select sleep(10000)" + handle = self.execute_query_async(query) + query_id = self.client.handle_id(handle) + cancel_query_url = "{0}cancel_query?query_id={1}".format(self.ROOT_URL.format + ("25000"), query_id) + text_profile_url = "{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL + .format("25000"), query_id) + requests.get(cancel_query_url) + response = requests.get(text_profile_url) + cancel_status = "Cancelled from Impala's debug web interface by user: " \ + "'anonymous' at" + assert cancel_status in response.text + self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web " + "interface by user: 'anonymous' at", expected_count=-1, timeout_s=30) + # Session closing from the WebUI does not produce the cause message in the profile, + # so we will skip checking the runtime profile. + results = self.execute_query("select current_session()") + session_id = results.data[0] + close_session_url = "{0}close_session?session_id={1}".format(self.ROOT_URL.format + ("25000"), session_id) + requests.get(close_session_url) + self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug " + "web interface by user: 'anonymous' at", expected_count=-1, timeout_s=30) diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index 8ef2c29f9..ba0df0b74 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -43,20 +43,22 @@ FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT', MT_DOP_VALUES = [0, 4] # Queries should cover all exec nodes. +# {db} will be replaced by target database for specific table_format in test vector. QUERIES = [ - "select * from alltypes", - "select count(*) from alltypessmall", - "select count(int_col) from alltypessmall group by id", - "select 1 from alltypessmall a join alltypessmall b on a.id = b.id", - "select 1 from alltypessmall a join alltypessmall b on a.id != b.id", - "select 1 from alltypessmall order by id", - "select 1 from alltypessmall order by id limit 100", - "select * from alltypessmall union all select * from alltypessmall", - "select row_number() over (partition by int_col order by id) from alltypessmall", - "select c from (select id c from alltypessmall order by id limit 10) v where c = 1", + "select * from {db}.alltypes", + "select count(*) from {db}.alltypessmall", + "select count(int_col) from {db}.alltypessmall group by id", + "select 1 from {db}.alltypessmall a join {db}.alltypessmall b on a.id = b.id", + "select 1 from {db}.alltypessmall a join {db}.alltypessmall b on a.id != b.id", + "select 1 from {db}.alltypessmall order by id", + "select 1 from {db}.alltypessmall order by id limit 100", + "select * from {db}.alltypessmall union all select * from {db}.alltypessmall", + "select row_number() over (partition by int_col order by id) from {db}.alltypessmall", + """select c from (select id c from {db}.alltypessmall order by id limit 10) v + where c = 1""", """SELECT STRAIGHT_JOIN * - FROM alltypes t1 - JOIN /*+broadcast*/ alltypesagg t2 ON t1.id = t2.id + FROM {db}.alltypes t1 + JOIN /*+broadcast*/ {db}.alltypesagg t2 ON t1.id = t2.id WHERE t2.int_col < 1000""" ] @@ -93,11 +95,8 @@ class TestFailpoints(ImpalaTestSuite): # killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2. @pytest.mark.execute_serially def test_failpoints(self, vector): - with self.change_database(self.client, vector.get_table_format()): - self.__run_failpoints(vector) - - def __run_failpoints(self, vector): - query = vector.get_value('query') + db_name = ImpalaTestSuite.get_db_name_from_format(vector.get_table_format()) + query = vector.get_value('query').format(db=db_name) action = vector.get_value('action') location = vector.get_value('location') vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') @@ -201,7 +200,9 @@ class TestFailpoints(ImpalaTestSuite): handle = self.execute_query_async(query, vector.get_value('exec_option')) LOG.info('Sleeping') sleep(3) - cancel_result = self.client.cancel(handle) - self.client.close_query(handle) - assert cancel_result.status_code == 0,\ - 'Unexpected status code from cancel request: %s' % cancel_result + try: + self.client.cancel(handle) + except IMPALA_CONNECTION_EXCEPTION as e: + assert False, 'Unexpected exception from cancel request: {}'.format(str(e)) + finally: + self.client.close_query(handle) diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index e54f0fc43..867fe6528 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -808,7 +808,7 @@ class TestDdlStatements(TestDdlBase): # paths, converts them to integers, and checks that wehave all the ones we # expect. PARTITION_RE = re.compile("p=([0-9]+)") - assert list(map(int, PARTITION_RE.findall(str(partitions)))) == \ + assert list(map(int, PARTITION_RE.findall(str(partitions.data)))) == \ list(range(MAX_PARTITION_UPDATES_PER_RPC + 2)) def test_create_alter_tbl_properties(self, unique_database): diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index dde3a06b9..cc297477a 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -641,23 +641,24 @@ class TestEventSyncWaiting(ImpalaTestSuite): self.run_stmt_in_hive("insert into {} select 0,0".format(tbl)) res = self.execute_query_expect_success(client, insert_stmt) # Result rows are "partition_name: num_rows_inserted" for each modified partitions - assert res.data == ['p=0: 1'] + assert 'Partition: p=0\nNumModifiedRows: 1\n' in res.runtime_profile # Insert one row into the same partition in Hive and use the table in INSERT in Impala self.run_stmt_in_hive("insert into {} select 1,0".format(tbl)) res = self.execute_query_expect_success(client, insert_stmt) - assert res.data == ['p=0: 2'] + assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile # Add another new partition in Hive and use the table in INSERT in Impala self.run_stmt_in_hive("insert into {} select 2,2".format(tbl)) res = self.execute_query_expect_success(client, insert_stmt) - assert res.data == ['p=0: 2', 'p=2: 1'] + assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile + assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile # Drop one partition in Hive and use the table in INSERT in Impala self.run_stmt_in_hive("alter table {} drop partition(p=0)".format(tbl)) res = self.execute_query_expect_success(client, insert_stmt) - assert res.data == ['p=2: 1'] + assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile # Truncate the table in Hive and use it in INSERT in Impala self.run_stmt_in_hive("truncate table {}".format(tbl)) res = self.execute_query_expect_success(client, insert_stmt) - assert len(res.data) == 0 + assert 'NumModifiedRows:' not in res.runtime_profile def test_txn(self, vector, unique_database): client = self.create_impala_client_from_vector(vector) diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py index 799e022d1..4e43b7a98 100644 --- a/tests/metadata/test_explain.py +++ b/tests/metadata/test_explain.py @@ -22,8 +22,13 @@ import re from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfLocal, SkipIfNotHdfsMinicluster, SkipIfEC +from tests.common.test_dimensions import ( + create_exec_option_dimension, + create_uncompressed_text_dimension, +) from tests.util.filesystem_utils import WAREHOUSE + # Tests the different explain levels [0-3] on a few queries. # TODO: Clean up this test to use an explain level test dimension and appropriate # result sub-sections for the expected explain plans. @@ -35,12 +40,10 @@ class TestExplain(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestExplain, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format == 'text' and\ - v.get_value('table_format').compression_codec == 'none' and\ - v.get_value('exec_option')['batch_size'] == 0 and\ - v.get_value('exec_option')['disable_codegen'] == False and\ - v.get_value('exec_option')['num_nodes'] != 1) + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( + cluster_sizes=[0], batch_sizes=[0], disable_codegen_options=[False])) + cls.ImpalaTestMatrix.add_dimension( + create_uncompressed_text_dimension(cls.get_workload())) @SkipIfNotHdfsMinicluster.plans def test_explain_level0(self, vector): @@ -72,7 +75,7 @@ class TestExplain(ImpalaTestSuite): @staticmethod def check_row_size_and_cardinality(query_result, expected_row_size=None, expected_cardinality=None): - regex = re.compile('tuple-ids=.+ row-size=(\d+)B cardinality=(.*)') + regex = re.compile(r'tuple-ids=.+ row-size=(\d+)B cardinality=(.*)') found_match = False for res in query_result: m = regex.match(res.strip()) @@ -85,7 +88,7 @@ class TestExplain(ImpalaTestSuite): assert m.groups()[1] == expected_cardinality assert found_match, query_result - def test_explain_validate_cardinality_estimates(self, vector, unique_database): + def test_explain_validate_cardinality_estimates(self, unique_database): # Tests that the cardinality estimates are correct for partitioned tables. # TODO Cardinality estimation tests should eventually be part of the planner tests. # TODO Remove this test @@ -98,17 +101,17 @@ class TestExplain(ImpalaTestSuite): # All partitions are filtered out, cardinality should be 0. result = self.execute_query("explain select * from %s.%s where year = 1900" % ( - db_name, tbl_name), query_options={'explain_level':3}) + db_name, tbl_name), query_options={'explain_level': 3}) check_cardinality(result.data, '0') # Half of the partitions are filtered out, cardinality should be 3650. result = self.execute_query("explain select * from %s.%s where year = 2010" % ( - db_name, tbl_name), query_options={'explain_level':3}) + db_name, tbl_name), query_options={'explain_level': 3}) check_cardinality(result.data, '3.65K') # None of the partitions are filtered out, cardinality should be 7300. result = self.execute_query("explain select * from %s.%s" % (db_name, tbl_name), - query_options={'explain_level':3}) + query_options={'explain_level': 3}) check_cardinality(result.data, '7.30K') # Create a partitioned table with a mixed set of available stats, @@ -122,11 +125,11 @@ class TestExplain(ImpalaTestSuite): "alter table %s set tblproperties('numRows'='100')" % mixed_tbl) # Should fall back to table-level cardinality when partitions lack stats. result = self.execute_query("explain select * from %s" % mixed_tbl, - query_options={'explain_level':3}) + query_options={'explain_level': 3}) check_cardinality(result.data, '100') # Should fall back to table-level cardinality, even for a subset of partitions, result = self.execute_query("explain select * from %s where p = 1" % mixed_tbl, - query_options={'explain_level':3}) + query_options={'explain_level': 3}) check_cardinality(result.data, '100') # Set the number of rows at the table level to -1. self.execute_query( @@ -137,17 +140,17 @@ class TestExplain(ImpalaTestSuite): # Use partition stats when availabe. Row counts for partitions without # stats are estimated. result = self.execute_query("explain select * from %s" % mixed_tbl, - query_options={'explain_level':3}) + query_options={'explain_level': 3}) check_cardinality(result.data, '51') # Set the number of rows at the table level back to 100. self.execute_query( "alter table %s set tblproperties('numRows'='100')" % mixed_tbl) # Fall back to table-level stats when no selected partitions have stats. result = self.execute_query("explain select * from %s where p = 2" % mixed_tbl, - query_options={'explain_level':3}) + query_options={'explain_level': 3}) check_cardinality(result.data, '100') - def test_explain_row_size_estimates(self, vector, unique_database): + def test_explain_row_size_estimates(self, unique_database): """ Tests that EXPLAIN returns the expected row sizes with and without stats. Planner tests is probably a more logical place for this, but covering string avg_size @@ -187,47 +190,40 @@ class TestExplain(ImpalaTestSuite): class TestExplainEmptyPartition(ImpalaTestSuite): - TEST_DB_NAME = "imp_1708" - - def setup_method(self, method): - self.cleanup_db(self.TEST_DB_NAME) - self.execute_query("create database if not exists {0} location '{1}/{0}.db'" - .format(self.TEST_DB_NAME, WAREHOUSE)) - - def teardown_method(self, method): - self.cleanup_db(self.TEST_DB_NAME) @SkipIfLocal.hdfs_client - def test_non_empty_partition_0_rows(self): + def test_non_empty_partition_0_rows(self, unique_database): """Regression test for IMPALA-1708: if a partition has 0 rows but > 0 files after COMPUTE STATS, don't warn the user about missing stats. The files are probably corrupted, or used for something else.""" self.client.execute("SET EXPLAIN_LEVEL=3") self.client.execute("CREATE TABLE %s.empty_partition (col int) " - "partitioned by (p int)" % self.TEST_DB_NAME) + "partitioned by (p int)" % unique_database) self.client.execute( - "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % self.TEST_DB_NAME) + "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % unique_database) # Put an empty file in the partition so we have > 0 files, but 0 rows self.filesystem_client.create_file( "{1}/{0}.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty". - format(self.TEST_DB_NAME, WAREHOUSE), "") - self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME) - self.client.execute("COMPUTE STATS %s.empty_partition" % self.TEST_DB_NAME) + format(unique_database, WAREHOUSE), "") + self.client.execute("REFRESH %s.empty_partition" % unique_database) + self.client.execute("COMPUTE STATS %s.empty_partition" % unique_database) assert "NULL\t0\t1" in str( - self.client.execute("SHOW PARTITIONS %s.empty_partition" % self.TEST_DB_NAME)) + self.client.execute( + "SHOW PARTITIONS %s.empty_partition" % unique_database).get_data()) assert "missing relevant table and/or column statistics" not in str( self.client.execute( - "EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME)) + "EXPLAIN SELECT * FROM %s.empty_partition" % unique_database).get_data()) # Now add a partition with some data (so it gets selected into the scan), to check # that its lack of stats is correctly identified self.client.execute( - "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % self.TEST_DB_NAME) + "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % unique_database) self.filesystem_client.create_file( - "{1}/{0}.db/empty_partition/p=1/rows".format(self.TEST_DB_NAME, WAREHOUSE), "1") - self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME) + "{1}/{0}.db/empty_partition/p=1/rows".format(unique_database, WAREHOUSE), "1") + self.client.execute("REFRESH %s.empty_partition" % unique_database) explain_result = str( - self.client.execute("EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME)) + self.client.execute( + "EXPLAIN SELECT * FROM %s.empty_partition" % unique_database).get_data()) assert "missing relevant table and/or column statistics" in explain_result # Also test IMPALA-1530 - adding the number of partitions missing stats assert "partitions: 1/2 " in explain_result diff --git a/tests/metadata/test_recursive_listing.py b/tests/metadata/test_recursive_listing.py index 698013364..8b00cc273 100644 --- a/tests/metadata/test_recursive_listing.py +++ b/tests/metadata/test_recursive_listing.py @@ -16,7 +16,7 @@ import pytest import requests import time -from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION +from tests.common.impala_connection import ERROR, FINISHED from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.test_dimensions import create_uncompressed_text_dimension from tests.common.skip import SkipIfLocal, SkipIfFS @@ -41,8 +41,8 @@ class TestRecursiveListing(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) cls.ImpalaTestMatrix.add_constraint(lambda v: - (v.get_value('table_format').file_format == 'text' and - v.get_value('table_format').compression_codec == 'none')) + (v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_codec == 'none')) def _show_files(self, table): files = self.client.execute("show files in {0}".format(table)) @@ -65,7 +65,7 @@ class TestRecursiveListing(ImpalaTestSuite): # Create the table self.execute_query_expect_success(self.client, - ("create table {tbl} (a string) {partclause} " + + ("create table {tbl} (a string) {partclause} " "stored as textfile location '{loc}'").format( tbl=fq_tbl_name, partclause=(partitioned and "partitioned by (p int)" or ""), @@ -119,7 +119,7 @@ class TestRecursiveListing(ImpalaTestSuite): assert len(self._get_rows(fq_tbl_name)) == 3 # Test that disabling recursive listings makes the nested files disappear. - self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" + + self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" "'impala.disable.recursive.listing'='true')").format(fq_tbl_name)) self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name)) assert len(self._show_files(fq_tbl_name)) == 1 @@ -130,7 +130,7 @@ class TestRecursiveListing(ImpalaTestSuite): assert len(self._show_files(fq_tbl_name)) == 1 assert len(self._get_rows(fq_tbl_name)) == 1 # Re-enable. - self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" + + self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" "'impala.disable.recursive.listing'='false')").format(fq_tbl_name)) self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name)) assert len(self._show_files(fq_tbl_name)) == 3 @@ -210,10 +210,7 @@ class TestRecursiveListing(ImpalaTestSuite): LOG.info("removing staging dir " + large_dir) self.filesystem_client.delete_file_dir(large_dir, recursive=True) LOG.info("removed staging dir " + large_dir) - try: - self.client.fetch(refresh_stmt, handle) - assert not refresh_should_fail, "REFRESH should fail" - except IMPALA_CONNECTION_EXCEPTION as e: - assert refresh_should_fail, "unexpected exception " + str(e) + expected_state = ERROR if refresh_should_fail else FINISHED + self.client.wait_for_impala_state(handle, expected_state, 10) finally: requests.get(self.reset_log_level_url) diff --git a/tests/performance/query_exec_functions.py b/tests/performance/query_exec_functions.py index 0da147146..ed7fc1b9a 100644 --- a/tests/performance/query_exec_functions.py +++ b/tests/performance/query_exec_functions.py @@ -21,7 +21,7 @@ import logging import re from datetime import datetime from impala.dbapi import connect -from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient, ImpalaBeeswaxResult +from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient from sys import maxsize from tests.performance.query import HiveQueryResult, ImpalaQueryResult from tests.util.shell_util import exec_process @@ -34,6 +34,7 @@ DEFAULT_HIVE_HS2_PORT = 10000 LOG = logging.getLogger('query_exec_functions') + def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False, database=None, execOptions=None): host, port = hiveserver, DEFAULT_HIVE_HS2_PORT @@ -52,6 +53,7 @@ def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False, LOG.error("Error Connecting: {0}".format(str(e))) return cursor + def execute_using_hive_hs2(query, query_config): exec_result = HiveQueryResult(query, query_config=query_config) plugin_runner = query_config.plugin_runner @@ -80,6 +82,7 @@ def execute_using_hive_hs2(query, query_config): if plugin_runner: plugin_runner.run_plugins_post(scope="Query") return exec_result + def get_hs2_impala_cursor(impalad, use_kerberos=False, database=None): """Get a cursor to an impalad @@ -107,6 +110,7 @@ def get_hs2_impala_cursor(impalad, use_kerberos=False, database=None): LOG.error("Error connecting: {0}".format(str(e))) return cursor + def execute_using_impala_hs2(query, query_config): """Executes a sql query against Impala using the hs2 interface. @@ -140,6 +144,7 @@ def execute_using_impala_hs2(query, query_config): if plugin_runner: plugin_runner.run_plugins_post(scope="Query") return exec_result + def establish_beeswax_connection(query_config): """Establish a connection to the user specified impalad. @@ -148,7 +153,10 @@ def establish_beeswax_connection(query_config): Returns: ImpalaBeeswaxClient is the connection suceeds, None otherwise. + + DEPRECATED: use hs2 instead of beeswax. """ + LOG.warning('beeswax protocol is deprecated.') use_kerberos = query_config.use_kerberos user = query_config.user password = query_config.password @@ -170,6 +178,7 @@ def establish_beeswax_connection(query_config): LOG.error("Error connecting: {0}".format(str(e))) return client + def execute_using_impala_beeswax(query, query_config): """Executes a query using beeswax. @@ -181,6 +190,8 @@ def execute_using_impala_beeswax(query, query_config): Returns: ImpalaQueryResult + + DEPRECATED: use hs2 instead of beeswax. """ # Create a client object to talk to impalad @@ -204,6 +215,7 @@ def execute_using_impala_beeswax(query, query_config): if plugin_runner: plugin_runner.run_plugins_post(context=context, scope="Query") return construct_exec_result(result, exec_result) + def build_context(query, query_config): """Build context based on query config for plugin_runner. @@ -221,6 +233,7 @@ def build_context(query, query_config): context['query'] = query return context + def construct_exec_result(result, exec_result): """ Transform an ImpalaBeeswaxResult object to a ImpalaQueryResult object. @@ -241,22 +254,24 @@ def construct_exec_result(result, exec_result): setattr(exec_result, attr, getattr(result, attr)) return exec_result + def execute_using_jdbc(query, query_config): """Executes a query using JDBC""" query_string = query.query_str + ';' if query.db: query_string = 'use %s; %s' % (query.db, query_string) cmd = query_config.jdbc_client_cmd + " -q \"%s\"" % query_string - return run_query_capture_results(cmd, query, exit_on_error=False) + return run_query_capture_results(cmd, query) -def parse_jdbc_query_results(stdout, stderr, query): + +def parse_jdbc_query_results(stdout, query): """ Parse query execution results for the Impala JDBC client Parses the query execution details (avg time, stddev) from the output of the Impala JDBC test client. """ - jdbc_result_regex = 'row\(s\) in (\d*).(\d*)s' + jdbc_result_regex = r'row\(s\) in (\d*).(\d*)s' time_taken = 0.0 for line in stdout.split('\n'): match = re.search(jdbc_result_regex, line) @@ -266,6 +281,7 @@ def parse_jdbc_query_results(stdout, stderr, query): result_data = re.findall(r'\[START\]----\n(.*?)\n----\[END\]', stdout, re.DOTALL)[0] return create_exec_result(time_taken, result_data, query) + def create_exec_result(time_taken, result_data, query): exec_result = HiveQueryResult(query) if result_data: @@ -275,7 +291,8 @@ def create_exec_result(time_taken, result_data, query): exec_result.success = True return exec_result -def run_query_capture_results(cmd, query, exit_on_error): + +def run_query_capture_results(cmd, query): """ Runs the given query command and returns the execution result. @@ -300,7 +317,7 @@ def run_query_capture_results(cmd, query, exit_on_error): exec_result.query_error = msg return exec_result # The command completed - exec_result = parse_jdbc_query_results(stdout, stderr, query) + exec_result = parse_jdbc_query_results(stdout, query) exec_result.query = query exec_result.start_time = start_time return exec_result diff --git a/tests/query_test/test_beeswax.py b/tests/query_test/test_beeswax.py index a0c240ec1..8fcac557c 100644 --- a/tests/query_test/test_beeswax.py +++ b/tests/query_test/test_beeswax.py @@ -18,6 +18,7 @@ from __future__ import absolute_import, division, print_function from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_vector import BEESWAX class TestBeeswax(ImpalaTestSuite): @@ -27,13 +28,13 @@ class TestBeeswax(ImpalaTestSuite): and different users.""" USER1 = "user1" USER2 = "user2" - client1 = self.client + client1 = self.create_impala_client(protocol=BEESWAX) different_user_client = None unset_user_client = None try: - same_user_client = self.create_impala_client(protocol='beeswax') - different_user_client = self.create_impala_client(protocol='beeswax') - unset_user_client = self.create_impala_client(protocol='beeswax') + same_user_client = self.create_impala_client(protocol=BEESWAX) + different_user_client = self.create_impala_client(protocol=BEESWAX) + unset_user_client = self.create_impala_client(protocol=BEESWAX) # Unauthenticated Beewax only sets user once the query is run. result = client1.execute("select effective_user()", user=USER1) diff --git a/tests/query_test/test_errorlog.py b/tests/query_test/test_errorlog.py index 7d2efe230..faa4e66c3 100644 --- a/tests/query_test/test_errorlog.py +++ b/tests/query_test/test_errorlog.py @@ -59,10 +59,13 @@ class TestErrorLogs(ImpalaTestSuite): # large enough to further guarantee at least one cleared error maps to be sent to # coordinator. sleep(30) - cancel_result = self.client.cancel(handle) - self.client.close_query(handle) - assert cancel_result.status_code == 0,\ - 'Unexpected status code from cancel request: %s' % cancel_result + try: + self.client.cancel(handle) + except IMPALA_CONNECTION_EXCEPTION as ex: + assert False, 'Unexpected exception from cancel request: {}'.format( + str(ex)) + finally: + self.client.close_query(handle) # As long as impala did not crash we are good. except IMPALA_CONNECTION_EXCEPTION: return diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index aab02f329..0a639c2cf 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -307,7 +307,8 @@ class TestInsertPartKey(ImpalaTestSuite): "%2F%3A%3D%3F%5C%7B%5B%5D%23%5E" res = self.execute_query( "insert into {} partition(p='{}') values (0)".format(tbl, special_characters)) - assert res.data[0] == part_dir + ": 1" + assert part_dir in res.runtime_profile + assert 'NumModifiedRows: 1\n' in res.runtime_profile res = self.client.execute("select p from {}".format(tbl)) assert res.data[0] == part_value res = self.execute_query("show partitions " + tbl) @@ -341,7 +342,8 @@ class TestInsertPartKey(ImpalaTestSuite): "\\u001F\\\"\\u007F\'%*\\/:=?\\\\{[]#^" res = self.execute_query( "insert into {} values (0, '{}')".format(tbl, special_characters)) - assert res.data[0] == part_dir + ": 1" + assert part_dir in res.runtime_profile + assert 'NumModifiedRows: 1\n' in res.runtime_profile res = self.client.execute("select p from {}".format(tbl)) assert res.data[0] == part_value res = self.execute_query("show partitions " + tbl) diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py index 18c0d21db..60ae1fe4a 100644 --- a/tests/query_test/test_mem_usage_scaling.py +++ b/tests/query_test/test_mem_usage_scaling.py @@ -123,10 +123,12 @@ class TestLowMemoryLimits(ImpalaTestSuite): try: self.run_test_case(tpch_query, new_vector) except IMPALA_CONNECTION_EXCEPTION as e: - if not expects_error: raise + if not expects_error: + assert False, "Not expecting error, but got {}".format(str(e)) found_expected_error = False for error_msg in MEM_LIMIT_ERROR_MSGS: - if error_msg in str(e): found_expected_error = True + if error_msg in str(e): + found_expected_error = True assert found_expected_error, str(e) diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py index 52ffdc06e..924425137 100644 --- a/tests/query_test/test_result_spooling.py +++ b/tests/query_test/test_result_spooling.py @@ -19,7 +19,6 @@ from __future__ import absolute_import, division, print_function import pytest import re import time -import threading from time import sleep from tests.common.errors import Timeout @@ -177,15 +176,13 @@ class TestResultSpooling(ImpalaTestSuite): # Regex to look for in the runtime profile. get_wait_time_regex = "RowBatchGetWaitTime: [1-9]" - # Execute the query, start a thread to fetch results, wait for the query to finish, + # Execute the query asynchronously, fetch results, wait for the query to finish, # and then validate that RowBatchGetWaitTime shows a non-zero value in the profile. handle = self.execute_query_async(query, vector.get_value('exec_option')) try: - thread = threading.Thread(target=lambda: - self.create_impala_client().fetch(query, handle)) - thread.start() + self.client.wait_for_admission_control(handle, 60) + self.client.fetch(query, handle) self.client.wait_for_impala_state(handle, FINISHED, 10) - thread.join() assert re.search(get_wait_time_regex, self.client.get_runtime_profile(handle)) finally: self.client.close_query(handle) diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py index ba4169ff6..2e2a7f1cd 100644 --- a/tests/query_test/test_runtime_filters.py +++ b/tests/query_test/test_runtime_filters.py @@ -112,16 +112,14 @@ class TestRuntimeFilters(ImpalaTestSuite): get woken up and exit promptly when the query is cancelled.""" # Make sure the cluster is quiesced before we start this test self._verify_no_fragments_running() - with self.change_database(self.client, vector.get_value('table_format')): - self.__run_wait_time_cancellation(vector) - - def __run_wait_time_cancellation(self, vector): # Set up a query where a scan (plan node 0, scanning alltypes) will wait # indefinitely for a filter to arrive. The filter arrival is delayed # by adding a wait to the scan of alltypestime (plan node 0). + db_name = ImpalaTestSuite.get_db_name_from_format(vector.get_table_format()) QUERY = """select straight_join * - from alltypes t1 - join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id""" + from {db}.alltypes t1 + join /*+shuffle*/ {db}.alltypestiny t2 + on t1.id = t2.id""".format(db=db_name) self.client.set_configuration(vector.get_exec_option_dict()) self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT") self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", "10000000") diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py index 97a9adb01..e15e59088 100644 --- a/tests/query_test/test_tablesample.py +++ b/tests/query_test/test_tablesample.py @@ -51,19 +51,21 @@ class TestTableSample(ImpalaTestSuite): def __run_tablesample(self, vector): repeatable = vector.get_value('repeatable') filtered = vector.get_value('filtered') + db_name = ImpalaTestSuite.get_db_name_from_format(vector.get_table_format()) where_clause = "" if filtered: where_clause = "where month between 1 and 6" - result = self.client.execute("select count(*) from alltypes %s" % where_clause) + result = self.client.execute("select count(*) from {}.alltypes {}".format( + db_name, where_clause)) baseline_count = int(result.data[0]) prev_count = None for perc in [5, 20, 50, 100]: rep_sql = "" if repeatable: rep_sql = " repeatable(1)" - sql_stmt = "select count(*) from alltypes tablesample system(%s)%s %s" \ - % (perc, rep_sql, where_clause) + sql_stmt = "select count(*) from {}.alltypes tablesample system({}){} {}".format( + db_name, perc, rep_sql, where_clause) handle = self.client.execute_async(sql_stmt) # IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 sec before # failing and logging the backtraces of all impalads. diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index 880341c67..8ae846235 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -18,10 +18,10 @@ from __future__ import absolute_import, division, print_function from tests.common.environ import ImpalaTestClusterFlagsDetector from tests.common.file_utils import grep_dir -from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster +from tests.common.skip import SkipIfBuildType from tests.common.impala_cluster import ImpalaCluster -from tests.common.impala_connection import FINISHED, RUNNING -from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.impala_connection import FINISHED, RUNNING, MinimalHS2Connection +from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite from tests.util.filesystem_utils import supports_storage_ids from tests.util.parse_util import parse_duration_string_ms from tests.common.test_vector import HS2 @@ -1082,16 +1082,20 @@ class TestWebPage(ImpalaTestSuite): def test_query_cancel_created(self): """Tests that if we cancel a query in the CREATED state, it still finishes and we can cancel it.""" + # Use MinimalHS2Connection because it has simpler concurrency than hs2_client. + with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client: + delay_created_action = "impalad_load_tables_delay:SLEEP@1000" + client.set_configuration(dict(debug_action=delay_created_action)) + self._run_test_query_cancel_created(client) + + def _run_test_query_cancel_created(self, client): query = "select count(*) from functional_parquet.alltypes" - delay_created_action = "impalad_load_tables_delay:SLEEP@1000" response_json = self.try_until("test baseline", self.get_queries, lambda resp: resp['num_in_flight_queries'] == 0) - # Start the query completely async. The server doesn't return a response until # the query has exited the CREATED state, so we need to get the query ID another way. - self.client.set_configuration(dict(debug_action=delay_created_action)) - proc = Process(target=lambda cli, q: cli.execute_async(q), args=(self.client, query)) + proc = Process(target=lambda cli, q: cli.execute_async(q), args=(client, query)) proc.start() response_json = self.try_until("query creation", self.get_queries, @@ -1133,9 +1137,15 @@ class TestWebPage(ImpalaTestSuite): def test_query_cancel_exception(self): """Tests that if we cancel a query in the CREATED state and it has an exception, we can cancel it.""" + # Use MinimalHS2Connection because it has simpler concurrency than hs2_client. + with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client: + delay_created_action = "impalad_load_tables_delay:SLEEP@1000" + client.set_configuration(dict(debug_action=delay_created_action)) + self._test_query_cancel_exception(client) + + def _test_query_cancel_exception(self, client): # Trigger UDF ERROR: Cannot divide decimal by zero query = "select *, 1.0/0 from functional_parquet.alltypes limit 10" - delay_created_action = "impalad_load_tables_delay:SLEEP@1000" response_json = self.try_until("test baseline", self.get_queries, lambda resp: resp['num_in_flight_queries'] == 0) @@ -1145,9 +1155,8 @@ class TestWebPage(ImpalaTestSuite): # Start the query completely async. The server doesn't return a response until # the query has exited the CREATED state, so we need to get the query ID another way. - self.client.set_configuration(dict(debug_action=delay_created_action)) queue = Queue() - proc = Process(target=run, args=(queue, self.client, query)) + proc = Process(target=run, args=(queue, client, query)) proc.start() response_json = self.try_until("query creation", self.get_queries, @@ -1168,7 +1177,7 @@ class TestWebPage(ImpalaTestSuite): proc.join() assert query_handle try: - self.client.fetch(query, query_handle) + client.fetch(query, query_handle) except Exception as e: re.match("UDF ERROR: Cannot divide decimal by zero", str(e)) @@ -1194,35 +1203,3 @@ class TestWebPage(ImpalaTestSuite): "fs.defaultFS", ports_to_test=self.TEST_PORTS_WITHOUT_SS) # check if response size is 2 , for both catalog and impalad webUI assert len(responses) == 2 - - -class TestWebPageAndCloseSession(ImpalaTestSuite): - ROOT_URL = "http://localhost:{0}/" - - @SkipIfDockerizedCluster.daemon_logs_not_exposed - def test_display_src_socket_in_query_cause(self): - # Execute a long running query then cancel it from the WebUI. - # Check the runtime profile and the INFO logs for the cause message. - query = "select sleep(10000)" - handle = self.execute_query_async(query) - query_id = self.client.handle_id(handle) - cancel_query_url = "{0}cancel_query?query_id={1}".format(self.ROOT_URL.format - ("25000"), query_id) - text_profile_url = "{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL - .format("25000"), query_id) - requests.get(cancel_query_url) - response = requests.get(text_profile_url) - cancel_status = "Cancelled from Impala's debug web interface by user: " \ - "'anonymous' at" - assert cancel_status in response.text - self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web " - "interface by user: 'anonymous' at", expected_count=-1) - # Session closing from the WebUI does not produce the cause message in the profile, - # so we will skip checking the runtime profile. - results = self.execute_query("select current_session()") - session_id = results.data[0] - close_session_url = "{0}close_session?session_id={1}".format(self.ROOT_URL.format - ("25000"), session_id) - requests.get(close_session_url) - self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug " - "web interface by user: 'anonymous' at", expected_count=-1)
