This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 00dc79adf6881677e6b7a694de021e29489b51e1 Author: Riza Suminto <[email protected]> AuthorDate: Fri Mar 7 12:18:04 2025 -0800 IMPALA-13907: Remove reference to create_beeswax_client This patch replace create_beeswax_client() reference to create_hs2_client() or vector-based client creation to prepare towards hs2 test migration. test_session_expiration_with_queued_query is changed to use impala.dbapi directly from Impyla due to limitation in ImpylaHS2Connection. TestAdmissionControllerRawHS2 is migrated to use hs2 as default test protocol. Modify test_query_expiration.py to set query option through client instead of SET query. test_query_expiration is slightly modified due to behavior difference in hs2 ImpylaHS2Connection. Remove remaining reference to BeeswaxConnection.QueryState. Fixed a bug in ImpylaHS2Connection.wait_for_finished_timeout(). Fix some easy flake8 issues caught thorugh this command: git show HEAD --name-only | grep '^tests.*py' \ | xargs -I {} impala-flake8 {} \ | grep -e U100 -e E111 -e E301 -e E302 -e E303 -e F... Testing: - Pass exhaustive tests. Change-Id: I1d84251835d458cc87fb8fedfc20ee15aae18d51 Reviewed-on: http://gerrit.cloudera.org:8080/22700 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/common/impala_cluster.py | 2 +- tests/common/impala_connection.py | 3 +- tests/common/impala_service.py | 16 +-- tests/common/resource_pool_config.py | 2 +- tests/custom_cluster/test_admission_controller.py | 11 +- tests/custom_cluster/test_ai_generate_text.py | 4 +- tests/custom_cluster/test_blacklist.py | 6 +- tests/custom_cluster/test_catalog_hms_failures.py | 14 +-- tests/custom_cluster/test_catalog_wait.py | 11 +- tests/custom_cluster/test_catalogd_ha.py | 4 +- tests/custom_cluster/test_client_ssl.py | 5 +- .../custom_cluster/test_compact_catalog_updates.py | 5 +- tests/custom_cluster/test_concurrent_ddls.py | 7 +- tests/custom_cluster/test_coordinators.py | 21 ++-- tests/custom_cluster/test_exchange_eos.py | 2 +- .../test_frontend_connection_limit.py | 8 +- tests/custom_cluster/test_hdfs_fd_caching.py | 5 +- tests/custom_cluster/test_insert_behaviour.py | 6 +- tests/custom_cluster/test_local_catalog.py | 27 ++--- tests/custom_cluster/test_mem_reservations.py | 5 +- tests/custom_cluster/test_process_failures.py | 16 +-- tests/custom_cluster/test_query_expiration.py | 118 +++++++++++---------- tests/custom_cluster/test_query_retries.py | 13 +-- .../test_refresh_invalid_partition.py | 6 +- tests/custom_cluster/test_restart_services.py | 29 +++-- tests/custom_cluster/test_s3a_access.py | 4 +- tests/custom_cluster/test_scratch_disk.py | 34 +++--- tests/custom_cluster/test_seq_file_filtering.py | 5 +- tests/custom_cluster/test_session_expiration.py | 78 ++++++++------ tests/custom_cluster/test_statestored_ha.py | 8 +- tests/custom_cluster/test_web_pages.py | 6 +- tests/metadata/test_compute_stats.py | 22 ++-- tests/metadata/test_ddl.py | 9 +- tests/query_test/test_rows_availability.py | 7 +- tests/query_test/test_udfs.py | 13 ++- tests/stress/query_retries_stress_runner.py | 6 +- tests/util/cancel_util.py | 2 +- 37 files changed, 277 insertions(+), 263 deletions(-) diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 7979c4754..142cf123e 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -193,7 +193,7 @@ class ImpalaCluster(object): n = 0 for impalad in self.impalads: try: - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() result = client.execute("select 1") assert result.success ++n diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 5e1d06266..323d2a2b8 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -849,8 +849,7 @@ class ImpylaHS2Connection(ImpalaConnection): return True elif impala_state == ERROR: try: - error_log = self.__do_rpc( - lambda: self.imp_service.get_log(operation_handle.log_context)) + error_log = operation_handle.get_handle().get_log() raise impyla_error.OperationalError(error_log, None) finally: self.close_query(operation_handle) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 49c059c5b..d873742c1 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -32,6 +32,7 @@ from datetime import datetime from time import sleep, time from tests.common.impala_connection import create_connection, create_ldap_connection +from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport @@ -440,9 +441,10 @@ class ImpaladService(BaseImpalaService): return self.is_port_open(self.webserver_port) def create_beeswax_client(self, use_kerberos=False): - """Creates a new beeswax client connection to the impalad""" + """Creates a new beeswax client connection to the impalad. + DEPRECATED: Use create_hs2_client() instead.""" client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), - use_kerberos, 'beeswax') + use_kerberos, BEESWAX) client.connect() return client @@ -468,7 +470,7 @@ class ImpaladService(BaseImpalaService): def create_hs2_client(self): """Creates a new HS2 client connection to the impalad""" - client = create_connection('%s:%d' % (self.hostname, self.hs2_port), protocol='hs2') + client = create_connection('%s:%d' % (self.hostname, self.hs2_port), protocol=HS2) client.connect() return client @@ -495,11 +497,11 @@ class ImpaladService(BaseImpalaService): def create_client(self, protocol): """Creates a new client connection for given protocol to this impalad""" - port = self.beeswax_port - if protocol == 'hs2': - port = self.hs2_port - elif protocol == 'hs2-http': + port = self.hs2_port + if protocol == HS2_HTTP: port = self.hs2_http_port + if protocol == BEESWAX: + port = self.beeswax_port client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol) client.connect() return client diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py index 1e267b696..162a7b8e0 100644 --- a/tests/common/resource_pool_config.py +++ b/tests/common/resource_pool_config.py @@ -61,7 +61,7 @@ class ResourcePoolConfig(object): if impala as picked up the change to that metric and is now equal to the 'target'val'. Times out after 'timeout' seconds""" metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str] - client = self.impala_service.create_beeswax_client() + client = self.impala_service.create_hs2_client() client.set_configuration_option('request_pool', pool_name) # set mem_limit to something above the proc limit so that the query always gets # rejected. diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index b5266c675..72c1d6534 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -47,7 +47,7 @@ from tests.common.impala_connection import ( from tests.common.resource_pool_config import ResourcePoolConfig from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster from tests.common.test_dimensions import ( - HS2, BEESWAX, + HS2, add_mandatory_exec_option, create_exec_option_dimension, create_single_exec_option_dimension, @@ -204,13 +204,6 @@ class TestAdmissionControllerBase(CustomClusterTestSuite): class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite): - @classmethod - def default_test_protocol(cls): - # HS2TestSuite override self.hs2_client with a raw Impala hs2 thrift client. - # This will set self.client = self.beeswax_client. - # Do not change this. Multiple test method has been hardcoded under this assumption. - return BEESWAX - def __check_pool_rejected(self, client, pool, expected_error_re): try: client.set_configuration({'request_pool': pool}) @@ -286,7 +279,7 @@ class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite): to require a specific pool, and validate that the per-pool configurations were applied.""" impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() # Expected default mem limit for queueA, used in several tests below queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024) try: diff --git a/tests/custom_cluster/test_ai_generate_text.py b/tests/custom_cluster/test_ai_generate_text.py index 5b8d82504..51c70da13 100644 --- a/tests/custom_cluster/test_ai_generate_text.py +++ b/tests/custom_cluster/test_ai_generate_text.py @@ -49,7 +49,7 @@ class TestAIGenerateText(CustomClusterTestSuite): '--impalad_args=--ai_additional_platforms="bad.site" ' '--ai_endpoint="https://bad.site"']) impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() err = self.execute_query_expect_failure(client, self.ai_generate_text_default_query) assert re.search(re.escape(self.AI_GENERATE_COMMON_ERR_PREFIX), str(err)) assert re.search(re.escape(self.AI_CURL_NETWORK_ERR), str(err)) @@ -61,7 +61,7 @@ class TestAIGenerateText(CustomClusterTestSuite): '--ai_endpoint="https://api.openai.com/v1/chat/completions" ' '--ai_api_key_jceks_secret=""']) impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() err = self.execute_query_expect_failure(client, self.ai_generate_text_default_query) assert re.search(re.escape(self.AI_GENERATE_COMMON_ERR_PREFIX), str(err)) assert re.search(re.escape(self.AI_CURL_NETWORK_ERR), str(err)) diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py index 9891bcd4e..6a80ab611 100644 --- a/tests/custom_cluster/test_blacklist.py +++ b/tests/custom_cluster/test_blacklist.py @@ -233,13 +233,13 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite): print("Generated dir " + dir_path) return result - def setup_method(self, method): + def setup_method(self, method): # noqa: U100 # Don't call the superclass method to prevent starting Impala before each test. In # this class, each test is responsible for doing that because we want to generate # the parameter string to start-impala-cluster in each test method. pass - def teardown_method(self, method): + def teardown_method(self, method): # noqa: U100 self.clear_tmp_dirs() @SkipIfBuildType.not_dev_build @@ -264,7 +264,7 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite): # First set debug_action for query as empty. vector.get_value('exec_option')['debug_action'] = '' coord_impalad = self.cluster.get_first_impalad() - client = coord_impalad.service.create_beeswax_client() + client = coord_impalad.service.create_client_from_vector(vector) # Expect spill to disk to success with debug_action as empty. Verify all nodes are # active. diff --git a/tests/custom_cluster/test_catalog_hms_failures.py b/tests/custom_cluster/test_catalog_hms_failures.py index 6feea8d08..e288fd887 100644 --- a/tests/custom_cluster/test_catalog_hms_failures.py +++ b/tests/custom_cluster/test_catalog_hms_failures.py @@ -21,10 +21,10 @@ import time import threading from subprocess import check_call -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import ( CustomClusterTestSuite, DEFAULT_CLUSTER_SIZE) +from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION from tests.common.skip import SkipIf from tests.util.event_processor_utils import EventProcessorUtils @@ -74,7 +74,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite): try: self.client.execute("describe %s" % tbl_name) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: print(str(e)) assert "Failed to load metadata for table: %s. Running 'invalidate metadata %s' "\ "may resolve this problem." % (tbl_name, tbl_name) in str(e) @@ -104,7 +104,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite): for _ in range(2): try: self.client.execute("describe {0}".format(table)) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Failed to load metadata for table: %s. "\ "Running 'invalidate metadata %s' may resolve this problem." \ % (table, table) in str(e) @@ -203,7 +203,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite): again""" # Make sure that catalogd is connected to HMS impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.reload_metadata(client) # Kill HMS @@ -214,7 +214,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite): start = time.time() try: self.reload_metadata(client) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Connection refused" in str(e) else: assert False, "Metadata load should have failed" @@ -237,7 +237,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite): HMS is started a little later""" # Make sure that catalogd is connected to HMS impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.reload_metadata(client) # Kill HMS @@ -279,7 +279,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite): catalogd fails""" # Make sure that catalogd is connected to HMS impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.reload_metadata(client) # Kill HMS diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py index eb1cc542e..1625e78fe 100644 --- a/tests/custom_cluster/test_catalog_wait.py +++ b/tests/custom_cluster/test_catalog_wait.py @@ -18,10 +18,10 @@ from __future__ import absolute_import, division, print_function import pytest -from time import sleep from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIfBuildType + @SkipIfBuildType.not_dev_build class TestCatalogWait(CustomClusterTestSuite): """Impalad coordinators must wait for their local replica of the catalog to be @@ -29,14 +29,9 @@ class TestCatalogWait(CustomClusterTestSuite): This test simulates a failed or slow catalog on impalad startup.""" def expect_connection(self, impalad): - impalad.service.create_beeswax_client() impalad.service.create_hs2_client() def expect_no_connection(self, impalad): - with pytest.raises(Exception) as e: - impalad.service.create_beeswax_client() - assert 'Could not connect to' in str(e.value) - with pytest.raises(Exception) as e: impalad.service.create_hs2_client() assert 'Could not connect to' in str(e.value) @@ -71,8 +66,8 @@ class TestCatalogWait(CustomClusterTestSuite): # and does not prematurely register itself as an executor. The former is # verified via query fragment metrics and the latter would fail if registered # but unable to process fragments. - client0 = self.cluster.impalads[0].service.create_beeswax_client() - client1 = self.cluster.impalads[1].service.create_beeswax_client() + client0 = self.cluster.impalads[0].service.create_hs2_client() + client1 = self.cluster.impalads[1].service.create_hs2_client() self.execute_query_expect_success(client0, "select * from functional.alltypes"); self.execute_query_expect_success(client1, "select * from functional.alltypes"); diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 689e12c47..0bcb7dc29 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -462,10 +462,10 @@ class TestCatalogdHA(CustomClusterTestSuite): assert(not catalogd_service_2.get_metric_value("catalog-server.active-status")) # Run DDL with SYNC_DDL enabled. - client = self.cluster.impalads[0].service.create_beeswax_client() + client = self.cluster.impalads[0].service.create_hs2_client() assert client is not None try: - self.execute_query_expect_success(client, "set SYNC_DDL=1") + client.set_configuration_option('sync_ddl', 1) ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)" handle = client.execute_async(ddl_query.format(database=unique_database)) diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py index dd2b05e93..832b547ae 100644 --- a/tests/custom_cluster/test_client_ssl.py +++ b/tests/custom_cluster/test_client_ssl.py @@ -33,6 +33,7 @@ from tests.common.environ import IS_REDHAT_DERIVATIVE from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_service import ImpaladService from tests.common.test_dimensions import create_client_protocol_dimension +from tests.common.test_vector import BEESWAX from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expect, \ ImpalaShell, create_impala_shell_executable_dimension @@ -53,6 +54,7 @@ else: SKIP_SSL_MSG = None CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] + class TestClientSsl(CustomClusterTestSuite): """Tests for a client using SSL (particularly, the Impala Shell) """ @@ -93,7 +95,6 @@ class TestClientSsl(CustomClusterTestSuite): pytest.skip("Python version does not support tls 1.2") super(TestClientSsl, cls).setup_class() - @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, statestored_args=SSL_ARGS, catalogd_args=SSL_ARGS) @@ -156,7 +157,7 @@ class TestClientSsl(CustomClusterTestSuite): cls.ImpalaTestMatrix.add_dimension( create_impala_shell_executable_dimension(dev_only=True)) cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('protocol') != 'beeswax') + v.get_value('protocol') != BEESWAX) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=WEBSERVER_SSL_ARGS, diff --git a/tests/custom_cluster/test_compact_catalog_updates.py b/tests/custom_cluster/test_compact_catalog_updates.py index de3a6c50d..52b481ca6 100644 --- a/tests/custom_cluster/test_compact_catalog_updates.py +++ b/tests/custom_cluster/test_compact_catalog_updates.py @@ -22,6 +22,7 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + class TestCompactCatalogUpdates(CustomClusterTestSuite): @classmethod def get_workload(cls): @@ -47,8 +48,8 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite): impalad2 = self.cluster.impalads[1] assert impalad2.service.get_metric_value("catalog.curr-version") > 0 - client1 = impalad1.service.create_beeswax_client() - client2 = impalad2.service.create_beeswax_client() + client1 = impalad1.service.create_hs2_client() + client2 = impalad2.service.create_hs2_client() query_options = {"sync_ddl" : 1} self.execute_query_expect_success(client1, "refresh functional.alltypes", query_options) diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py index 3630db369..167c47d4e 100644 --- a/tests/custom_cluster/test_concurrent_ddls.py +++ b/tests/custom_cluster/test_concurrent_ddls.py @@ -25,8 +25,10 @@ from multiprocessing import TimeoutError from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import ERROR, FINISHED from tests.util.shell_util import dump_server_stacktraces + class TestConcurrentDdls(CustomClusterTestSuite): """Test concurrent DDLs with invalidate metadata""" @@ -216,7 +218,6 @@ class TestConcurrentDdls(CustomClusterTestSuite): for i in range(10): self.execute_query("invalidate metadata " + tbl) # Always keep a concurrent REFRESH statement running - refresh_state = self.client.get_state(refresh_handle) - if refresh_state == self.client.QUERY_STATES['FINISHED']\ - or refresh_state == self.client.QUERY_STATES['EXCEPTION']: + refresh_state = self.client.get_impala_exec_state(refresh_handle) + if refresh_state == FINISHED or ERROR: refresh_handle = self.client.execute_async(refresh_stmt) diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py index 1f3c4b053..e95cc0c0e 100644 --- a/tests/custom_cluster/test_coordinators.py +++ b/tests/custom_cluster/test_coordinators.py @@ -31,6 +31,7 @@ from tests.common.test_result_verifier import error_msg_startswith LOG = logging.getLogger('test_coordinators') LOG.setLevel(level=logging.DEBUG) + class TestCoordinators(CustomClusterTestSuite): @pytest.mark.execute_serially def test_multiple_coordinators(self): @@ -49,7 +50,7 @@ class TestCoordinators(CustomClusterTestSuite): # Verify that Beeswax and HS2 client connections can't be established at a worker node beeswax_client = None try: - beeswax_client = worker.service.create_beeswax_client() + beeswax_client = worker.service.create_hs2_client() except Exception as e: LOG.info("Caught exception {0}".format(e)) finally: @@ -65,8 +66,8 @@ class TestCoordinators(CustomClusterTestSuite): # Verify that queries can successfully run on coordinator nodes try: - client1 = coordinator1.service.create_beeswax_client() - client2 = coordinator2.service.create_beeswax_client() + client1 = coordinator1.service.create_hs2_client() + client2 = coordinator2.service.create_hs2_client() # select queries self.execute_query_expect_success(client1, "select 1") @@ -107,7 +108,7 @@ class TestCoordinators(CustomClusterTestSuite): coordinator = self.cluster.impalads[0] client = None try: - client = coordinator.service.create_beeswax_client() + client = coordinator.service.create_hs2_client() assert client is not None query = "select count(*) from functional.alltypesagg" result = client.execute(query, fetch_exec_summary=True) @@ -157,7 +158,7 @@ class TestCoordinators(CustomClusterTestSuite): coordinator = self.cluster.impalads[0] try: - client = coordinator.service.create_beeswax_client() + client = coordinator.service.create_hs2_client() # create the database self.execute_query_expect_success(client, @@ -270,7 +271,7 @@ class TestCoordinators(CustomClusterTestSuite): client = None try: - client = coordinator.service.create_beeswax_client() + client = coordinator.service.create_hs2_client() assert client is not None client.execute("SET EXPLAIN_LEVEL=2") @@ -332,12 +333,12 @@ class TestCoordinators(CustomClusterTestSuite): impalad_args="-num_expected_executors=10") def test_num_expected_executors_flag(self): """Verifies that the '-num_expected_executors' flag is effective.""" - client = self.cluster.impalads[0].service.create_beeswax_client() - client.execute("set explain_level=2") + client = self.cluster.impalads[0].service.create_hs2_client() + client.set_configuration_option("explain_level", "2") ret = client.execute("explain select * from functional.alltypes a inner join " "functional.alltypes b on a.id = b.id;") num_hosts = "hosts=10 instances=10" - assert num_hosts in str(ret) + assert num_hosts in str(ret.tuples()) @SkipIfFS.hbase @SkipIf.skip_hbase @@ -346,7 +347,7 @@ class TestCoordinators(CustomClusterTestSuite): """Verifies HBase tables can be scanned by executor only impalads.""" self._start_impala_cluster([], cluster_size=3, num_coordinators=1, use_exclusive_coordinators=True) - client = self.cluster.impalads[0].service.create_beeswax_client() + client = self.cluster.impalads[0].service.create_hs2_client() query = "select count(*) from functional_hbase.alltypes" result = self.execute_query_expect_success(client, query) assert result.data == ['7300'] diff --git a/tests/custom_cluster/test_exchange_eos.py b/tests/custom_cluster/test_exchange_eos.py index c6e2999dd..439d9b50f 100644 --- a/tests/custom_cluster/test_exchange_eos.py +++ b/tests/custom_cluster/test_exchange_eos.py @@ -49,7 +49,7 @@ class TestExchangeEos(CustomClusterTestSuite): cluster = ImpalaCluster.get_e2e_test_cluster() coordinator = cluster.get_first_impalad() - client = coordinator.service.create_beeswax_client() + client = coordinator.service.create_hs2_client() vector.get_value('exec_option')['spool_query_results'] = 'true' for query in ["select * from tpch.lineitem order by l_orderkey limit 10000", diff --git a/tests/custom_cluster/test_frontend_connection_limit.py b/tests/custom_cluster/test_frontend_connection_limit.py index 409b2b9a1..86e873a26 100644 --- a/tests/custom_cluster/test_frontend_connection_limit.py +++ b/tests/custom_cluster/test_frontend_connection_limit.py @@ -20,7 +20,6 @@ import pytest from threading import Thread from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException # This custom cluster test exercises the behavior of the front end thrift # server on how a new client connection request is handled, after the maximum @@ -42,13 +41,8 @@ class TestFrontendConnectionLimit(CustomClusterTestSuite): super(TestFrontendConnectionLimit, cls).add_test_dimensions() def _connect_and_query(self, query, impalad): - client = impalad.service.create_beeswax_client() - try: + with impalad.service.create_hs2_client() as client: client.execute(query) - except Exception as e: - client.close() - raise ImpalaBeeswaxException(str(e)) - client.close() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py index 9cb6936a2..8574183e4 100644 --- a/tests/custom_cluster/test_hdfs_fd_caching.py +++ b/tests/custom_cluster/test_hdfs_fd_caching.py @@ -52,8 +52,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite): def setup_method(self, method): super(TestHdfsFdCaching, self).setup_method(method) - impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = self.hs2_client self.client = client client.execute("drop database if exists cachefd cascade") @@ -63,8 +62,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite): self.create_n_files(1) def teardown_method(self, method): - super(TestHdfsFdCaching, self).teardown_method(method) self.client.execute("drop database if exists cachefd cascade") + super(TestHdfsFdCaching, self).teardown_method(method) def run_fd_caching_test(self, vector, caching_expected, cache_capacity, eviction_timeout_secs): diff --git a/tests/custom_cluster/test_insert_behaviour.py b/tests/custom_cluster/test_insert_behaviour.py index f74ba80d2..77937450c 100644 --- a/tests/custom_cluster/test_insert_behaviour.py +++ b/tests/custom_cluster/test_insert_behaviour.py @@ -50,7 +50,7 @@ class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): def _get_impala_client(self): impalad = self.cluster.get_any_impalad() - return impalad.service.create_beeswax_client() + return impalad.service.create_hs2_client() def _create_test_tbl(self): client = self._get_impala_client() @@ -116,7 +116,7 @@ class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): def test_insert_inherit_permission_disabled(self): """Check that turning off insert permission inheritance works correctly.""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() try: ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/p1=1/" % TEST_TBL) default_perms = ls['FileStatus']['permission'] @@ -129,7 +129,7 @@ class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): self._check_partition_perms("p1=1/p2=3/", default_perms) self._check_partition_perms("p1=1/p2=3/p3=4/", default_perms) finally: - client.close() + client.close() @SkipIfFS.hive diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py index d83a17f53..f9b8858da 100644 --- a/tests/custom_cluster/test_local_catalog.py +++ b/tests/custom_cluster/test_local_catalog.py @@ -82,8 +82,8 @@ class TestLocalCatalogCompactUpdates(CustomClusterTestSuite): try: impalad1 = self.cluster.impalads[0] impalad2 = self.cluster.impalads[1] - client1 = impalad1.service.create_beeswax_client() - client2 = impalad2.service.create_beeswax_client() + client1 = impalad1.service.create_hs2_client() + client2 = impalad2.service.create_hs2_client() view = "%s.my_view" % unique_database @@ -126,7 +126,7 @@ class TestLocalCatalogCompactUpdates(CustomClusterTestSuite): """ try: impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() view = "%s.my_view" % unique_database self.execute_query_expect_success(client, "create view %s as select 1" % view) @@ -223,8 +223,8 @@ class TestLocalCatalogCompactUpdates(CustomClusterTestSuite): try: impalad1 = self.cluster.impalads[0] impalad2 = self.cluster.impalads[1] - client1 = impalad1.service.create_beeswax_client() - client2 = impalad2.service.create_beeswax_client() + client1 = impalad1.service.create_hs2_client() + client2 = impalad2.service.create_hs2_client() # Create something to make the cache not empty. self.execute_query_expect_success( @@ -265,8 +265,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): # Tracks query failures for all other reasons. failed_queries = queue.Queue() try: - client1 = self.cluster.impalads[0].service.create_beeswax_client() - client2 = self.cluster.impalads[1].service.create_beeswax_client() + client1 = self.cluster.impalads[0].service.create_hs2_client() + client2 = self.cluster.impalads[1].service.create_hs2_client() def stress_thread(client): # Loops, picks a random query in each iteration, runs it, @@ -277,7 +277,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): attempt += 1 try: print('Attempt', attempt, 'client', str(client)) - ret = self.execute_query_unchecked(client, q) + self.execute_query_unchecked(client, q) except Exception as e: if 'InconsistentMetadataFetchException' in str(e): with inconsistent_seen_lock: @@ -354,8 +354,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): try: impalad1 = self.cluster.impalads[0] impalad2 = self.cluster.impalads[1] - client1 = impalad1.service.create_beeswax_client() - client2 = impalad2.service.create_beeswax_client() + client1 = impalad1.service.create_hs2_client() + client2 = impalad2.service.create_hs2_client() # Create a view in client 1, cache the table list including that view in # client 2, and then drop it in client 1. While we've still cached the @@ -472,7 +472,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): self.execute_query( "insert into {0}.tbl partition(p) values (0,0)".format(unique_database)) - def read_part(i): + def read_part(i): # noqa: U100 self.execute_query_expect_success( tls.c, "select * from {0}.tbl where p=0".format(unique_database)) @@ -486,6 +486,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): # Refresh to invalidate the partition in local catalog cache self.execute_query("refresh {0}.tbl partition(p=0)".format(unique_database)) + class TestLocalCatalogObservability(CustomClusterTestSuite): def get_catalog_cache_metrics(self, impalad): """ Returns catalog cache metrics as a dict by scraping the json metrics page on the @@ -518,7 +519,7 @@ class TestLocalCatalogObservability(CustomClusterTestSuite): # Make sure /catalog_object endpoint is disabled on web UI. assert 'No URI handler for '/catalog_object'' \ in impalad.service.read_debug_webpage('/catalog_object') - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() cache_hit_rate_metric_key = "catalog.cache.hit-rate" cache_miss_rate_metric_key = "catalog.cache.miss-rate" cache_hit_count_metric_key = "catalog.cache.hit-count" @@ -594,6 +595,7 @@ class TestLocalCatalogObservability(CustomClusterTestSuite): % test_tbl self.assert_impalad_log_contains('INFO', log_regex) + class TestFullAcid(CustomClusterTestSuite): @classmethod def get_workload(self): @@ -617,6 +619,7 @@ class TestFullAcid(CustomClusterTestSuite): def test_full_acid_scans(self, vector, unique_database): self.run_test_case('QueryTest/full-acid-scans', vector, use_db=unique_database) + class TestReusePartitionMetadata(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( diff --git a/tests/custom_cluster/test_mem_reservations.py b/tests/custom_cluster/test_mem_reservations.py index 36996f910..14674532e 100644 --- a/tests/custom_cluster/test_mem_reservations.py +++ b/tests/custom_cluster/test_mem_reservations.py @@ -24,6 +24,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_test_suite import LOG from tests.verifiers.metric_verifier import MetricVerifier + class TestMemReservations(CustomClusterTestSuite): """Tests for memory reservations that require custom cluster arguments.""" @@ -34,7 +35,7 @@ class TestMemReservations(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--buffer_pool_limit=2g --memory_maintenance_sleep_time_ms=100") - def test_per_backend_min_reservation(self, vector): + def test_per_backend_min_reservation(self): """Tests that the per-backend minimum reservations are used (IMPALA-4833). The test sets the buffer_pool_limit very low (2gb), and then runs a query against two different coordinators. The query was created to have different minimum @@ -82,7 +83,7 @@ class TestMemReservations(CustomClusterTestSuite): self.error = None def run(self): - client = self.coordinator.service.create_beeswax_client() + client = self.coordinator.service.create_hs2_client() try: client.set_configuration(CONFIG_MAP) for i in range(20): diff --git a/tests/custom_cluster/test_process_failures.py b/tests/custom_cluster/test_process_failures.py index 01668d139..4092d48d4 100644 --- a/tests/custom_cluster/test_process_failures.py +++ b/tests/custom_cluster/test_process_failures.py @@ -47,7 +47,7 @@ class TestProcessFailures(CustomClusterTestSuite): def test_restart_coordinator(self): """Restarts the coordinator between queries.""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, QUERY) @@ -56,7 +56,7 @@ class TestProcessFailures(CustomClusterTestSuite): statestored.service.wait_for_live_subscribers(DEFAULT_NUM_SUBSCRIBERS, timeout=60) # Reconnect - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60) self.execute_query_expect_success(client, QUERY) @@ -67,7 +67,7 @@ class TestProcessFailures(CustomClusterTestSuite): """"Tests that when a coordinator running multiple queries is killed, all running fragments on executors are cancelled.""" impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() assert client is not None # A query which is cancelable and takes long time to execute query = "select * from tpch.lineitem t1, tpch.lineitem t2, tpch.lineitem t3 " \ @@ -100,7 +100,7 @@ class TestProcessFailures(CustomClusterTestSuite): def test_restart_statestore(self): """Tests the cluster still functions when the statestore dies.""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() statestored = self.cluster.statestored statestored.kill() impalad.service.wait_for_metric_value( @@ -128,7 +128,7 @@ class TestProcessFailures(CustomClusterTestSuite): def test_kill_restart_worker(self): """Verifies a worker is able to be killed.""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, QUERY) # select a different impalad and restart it @@ -182,7 +182,7 @@ class TestProcessFailures(CustomClusterTestSuite): def test_restart_catalogd(self): # Choose a random impalad verify a query can run against it. impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, QUERY) # Kill the catalogd. @@ -208,7 +208,7 @@ class TestProcessFailures(CustomClusterTestSuite): def test_restart_all_impalad(self): """Restarts all the impalads and runs a query""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, QUERY) # Kill each impalad and wait for the statestore to register the failures. @@ -229,7 +229,7 @@ class TestProcessFailures(CustomClusterTestSuite): for impalad in self.cluster.impalads: impalad.service.wait_for_num_known_live_backends(DEFAULT_CLUSTER_SIZE, timeout=60) impalad.service.wait_for_metric_value('catalog.ready', True, timeout=60) - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, QUERY) # Make sure the catalog service is actually back up by executing an operation # against it. diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py index ae0eaf02d..3f088713f 100644 --- a/tests/custom_cluster/test_query_expiration.py +++ b/tests/custom_cluster/test_query_expiration.py @@ -25,6 +25,8 @@ import threading from time import sleep, time from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import ERROR, FINISHED + class TestQueryExpiration(CustomClusterTestSuite): """Tests query expiration logic""" @@ -33,26 +35,32 @@ class TestQueryExpiration(CustomClusterTestSuite): in_flight_queries = impalad.service.get_in_flight_queries() # Guard against too few in-flight queries. assert expected <= len(in_flight_queries) + executing_ids = list() + waiting_ids = list() actual = waiting = 0 for query in in_flight_queries: if query["executing"]: actual += 1 + executing_ids.append(query["query_id"]) else: assert query["waiting"] waiting += 1 - assert actual == expected, '%s out of %s queries executing (expected %s)' \ - % (actual, len(in_flight_queries), expected) - assert waiting == expect_waiting, '%s out of %s queries waiting (expected %s)' \ - % (waiting, len(in_flight_queries), expect_waiting) + waiting_ids.append(query["query_id"]) + assert actual == expected, ( + '{0} out of {1} queries executing (expected {2}). query_id={3}').format( + actual, len(in_flight_queries), expected, str(executing_ids)) + assert waiting == expect_waiting, ( + '{0} out of {1} queries waiting (expected {2}). query_id={3}').format( + waiting, len(in_flight_queries), expect_waiting, str(waiting_ids)) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--idle_query_timeout=8", disable_log_buffering=True) - def test_query_expiration(self, vector): + def test_query_expiration(self): """Confirm that single queries expire if not fetched""" impalad = self.cluster.get_first_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired') handles = [] @@ -62,29 +70,29 @@ class TestQueryExpiration(CustomClusterTestSuite): handles.append(default_timeout_expire_handle) # This query will hit a lower time limit. - client.execute("SET EXEC_TIME_LIMIT_S=3") + client.set_configuration_option("EXEC_TIME_LIMIT_S", "3") time_limit_expire_handle = client.execute_async(query1) handles.append(time_limit_expire_handle) # This query will hit a lower idle timeout instead of the default timeout or time # limit. - client.execute("SET EXEC_TIME_LIMIT_S=5") - client.execute("SET QUERY_TIMEOUT_S=3") + client.set_configuration_option("EXEC_TIME_LIMIT_S", "5") + client.set_configuration_option("QUERY_TIMEOUT_S", "3") short_timeout_expire_handle = client.execute_async("SELECT SLEEP(2000000)") handles.append(short_timeout_expire_handle) - client.execute("SET EXEC_TIME_LIMIT_S=0") + client.set_configuration_option("EXEC_TIME_LIMIT_S", "0") # Set a huge timeout, to check that the server bounds it by --idle_query_timeout - client.execute("SET QUERY_TIMEOUT_S=1000") + client.set_configuration_option("QUERY_TIMEOUT_S", "1000") default_timeout_expire_handle2 = client.execute_async("SELECT SLEEP(3000000)") handles.append(default_timeout_expire_handle2) self._check_num_executing(impalad, len(handles)) # Run a query that fails, and will timeout due to client inactivity. - client.execute("SET QUERY_TIMEOUT_S=1") - client.execute('SET MEM_LIMIT=1') + client.set_configuration_option("QUERY_TIMEOUT_S", "1") + client.set_configuration_option('MEM_LIMIT', '1') exception_handle = client.execute_async("select count(*) from functional.alltypes") - client.execute('SET MEM_LIMIT=1g') + client.set_configuration_option('MEM_LIMIT', '1g') handles.append(exception_handle) before = time() @@ -94,16 +102,13 @@ class TestQueryExpiration(CustomClusterTestSuite): # still be running. assert num_expired + 3 == impalad.service.get_metric_value( 'impala-server.num-queries-expired') - assert (client.get_state(short_timeout_expire_handle) == - client.QUERY_STATES['EXCEPTION']) - assert (client.get_state(time_limit_expire_handle) == - client.QUERY_STATES['EXCEPTION']) - assert (client.get_state(exception_handle) == client.QUERY_STATES['EXCEPTION']) - assert (client.get_state(default_timeout_expire_handle) == - client.QUERY_STATES['FINISHED']) - assert (client.get_state(default_timeout_expire_handle2) == - client.QUERY_STATES['FINISHED']) - # The query cancelled by exec_time_limit_s should be waiting to be closed. + assert (client.get_impala_exec_state(short_timeout_expire_handle) == ERROR) + assert (client.get_impala_exec_state(time_limit_expire_handle) == ERROR) + assert (client.get_impala_exec_state(exception_handle) == ERROR) + assert (client.get_impala_exec_state(default_timeout_expire_handle) == FINISHED) + assert (client.get_impala_exec_state(default_timeout_expire_handle2) == FINISHED) + # The query cancelled by exec_time_limit_s (time_limit_expire_handle) should be + # waiting to be closed. self._check_num_executing(impalad, 2, 1) self.__expect_expired(client, query1, short_timeout_expire_handle, r"Query [0-9a-f]+:[0-9a-f]+ expired due to " @@ -113,6 +118,9 @@ class TestQueryExpiration(CustomClusterTestSuite): self.__expect_expired(client, query1, exception_handle, r"minimum memory reservation is greater than memory available.*\nQuery " + r"[0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)") + # hs2 client does not automaticaly close time_limit_expire_handle. + # manually close it. + client.close_query(time_limit_expire_handle) self._check_num_executing(impalad, 2) # Both queries with query_timeout_s < 4 should generate this message. self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: " @@ -128,16 +136,14 @@ class TestQueryExpiration(CustomClusterTestSuite): # The metric and client state are not atomically maintained. Since the # expiration metric has just been reached, accessing the client state # is guarded in a loop to avoid flaky false negatives. - self.__expect_client_state(client, default_timeout_expire_handle, - client.QUERY_STATES['EXCEPTION']) - self.__expect_client_state(client, default_timeout_expire_handle2, - client.QUERY_STATES['EXCEPTION']) + self.__expect_client_state(client, default_timeout_expire_handle, ERROR) + self.__expect_client_state(client, default_timeout_expire_handle2, ERROR) # Check that we didn't wait too long to be expired (double the timeout is sufficiently # large to avoid most noise in measurement) assert time() - before < 16 - client.execute("SET QUERY_TIMEOUT_S=0") + client.set_configuration_option("QUERY_TIMEOUT_S", "0") # Synchronous execution; calls fetch() and query should not time out. # Note: could be flakey if execute() takes too long to call fetch() etc after the # query completes. @@ -148,6 +154,9 @@ class TestQueryExpiration(CustomClusterTestSuite): == num_expired + len(handles) self._check_num_executing(impalad, 0) for handle in handles: + if handle == time_limit_expire_handle: + # This is manually closed already. + continue try: client.close_query(handle) assert False, "Close should always throw an exception" @@ -165,37 +174,37 @@ class TestQueryExpiration(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_query_timeout=0") - def test_query_expiration_no_default(self, vector): + def test_query_expiration_no_default(self): """Confirm that single queries expire if no default is set, but a per-query expiration or time limit is set""" impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired') query = "SELECT SLEEP(1000000)" - client.execute("SET QUERY_TIMEOUT_S=1") + client.set_configuration_option("QUERY_TIMEOUT_S", "1") timeout_handle = client.execute_async(query) - client.execute("SET QUERY_TIMEOUT_S=0") + client.set_configuration_option("QUERY_TIMEOUT_S", "0") - client.execute("SET EXEC_TIME_LIMIT_S=1") + client.set_configuration_option("EXEC_TIME_LIMIT_S", "1") time_limit_handle = client.execute_async(query) - client.execute("SET EXEC_TIME_LIMIT_S=0") + client.set_configuration_option("EXEC_TIME_LIMIT_S", "0") # Set a huge timeout, server should not expire the query while this test is running - client.execute("SET QUERY_TIMEOUT_S=1000") + client.set_configuration_option("QUERY_TIMEOUT_S", "1000") no_timeout_handle = client.execute_async(query) - before = time() sleep(4) # Query with timeout of 1 should have expired, other query should still be running. assert num_expired + 2 == impalad.service.get_metric_value( 'impala-server.num-queries-expired') - assert client.get_state(timeout_handle) == client.QUERY_STATES['EXCEPTION'] - assert client.get_state(time_limit_handle) == client.QUERY_STATES['EXCEPTION'] - assert client.get_state(no_timeout_handle) == client.QUERY_STATES['FINISHED'] + assert client.get_impala_exec_state(timeout_handle) == ERROR + assert client.get_impala_exec_state(time_limit_handle) == ERROR + assert client.get_impala_exec_state(no_timeout_handle) == FINISHED self.__expect_expired(client, query, timeout_handle, - "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)") + r"Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity " + r"\(timeout is 1s000ms\)") self.__expect_expired(client, query, time_limit_handle, "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms") @@ -211,14 +220,14 @@ class TestQueryExpiration(CustomClusterTestSuite): """Try to fetch 'expected_state' from 'client' within 'timeout' seconds. Fail if unable.""" start_time = time() - actual_state = client.get_state(handle) + actual_state = client.get_impala_exec_state(handle) while (actual_state != expected_state and time() - start_time < timeout): actual_state = client.get_state(handle) assert expected_state == actual_state @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_query_timeout=1") - def test_concurrent_query_expiration(self, vector): + def test_concurrent_query_expiration(self): """Confirm that multiple concurrent queries are correctly expired if not fetched""" class ExpiringQueryThread(threading.Thread): """Thread that runs a query and does not fetch so it will time out.""" @@ -250,9 +259,9 @@ class TestQueryExpiration(CustomClusterTestSuite): def run(self): # Query will not be idle but will hit time limit. - self.client.execute("SET EXEC_TIME_LIMIT_S=1") + self.client.set_configuration_option("EXEC_TIME_LIMIT_S", "1") try: - result = self.client.execute("SELECT SLEEP(2500)") + self.client.execute("SELECT SLEEP(2500)") assert "Expected to hit time limit" except Exception as e: self.exception = e @@ -267,23 +276,23 @@ class TestQueryExpiration(CustomClusterTestSuite): def run(self): # Query will complete before time limit. - self.client.execute("SET EXEC_TIME_LIMIT_S=10") + self.client.set_configuration_option("EXEC_TIME_LIMIT_S", "10") result = self.client.execute("SELECT count(*) FROM functional.alltypes") self.success = result.success self.data = result.data impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired') non_expiring_threads = \ - [NonExpiringQueryThread(impalad.service.create_beeswax_client()) + [NonExpiringQueryThread(impalad.service.create_hs2_client()) for _ in range(5)] - expiring_threads = [ExpiringQueryThread(impalad.service.create_beeswax_client()) + expiring_threads = [ExpiringQueryThread(impalad.service.create_hs2_client()) for _ in range(5)] - time_limit_threads = [TimeLimitThread(impalad.service.create_beeswax_client()) + time_limit_threads = [TimeLimitThread(impalad.service.create_hs2_client()) for _ in range(5)] non_expiring_time_limit_threads = [ - NonExpiringTimeLimitThread(impalad.service.create_beeswax_client()) + NonExpiringTimeLimitThread(impalad.service.create_hs2_client()) for _ in range(5)] all_threads = non_expiring_threads + expiring_threads + time_limit_threads +\ non_expiring_time_limit_threads @@ -296,14 +305,14 @@ class TestQueryExpiration(CustomClusterTestSuite): for t in non_expiring_threads: assert t.success for t in expiring_threads: - self.__expect_client_state(client, t.handle, client.QUERY_STATES['EXCEPTION']) + self.__expect_client_state(client, t.handle, ERROR) for t in time_limit_threads: assert re.search( "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms", str(t.exception)) for t in non_expiring_time_limit_threads: assert t.success - assert t.data[0] == '7300' # Number of rows in alltypes. + assert t.data[0] == '7300' # Number of rows in alltypes. @pytest.mark.execute_serially @CustomClusterTestSuite.with_args() @@ -330,5 +339,4 @@ class TestQueryExpiration(CustomClusterTestSuite): assert time() - before < 10 - self.__expect_client_state(self.client, handle, - self.client.QUERY_STATES['EXCEPTION']) + self.__expect_client_state(self.client, handle, ERROR) diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 4cfc278e5..3599cc715 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -932,7 +932,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() query = self._count_query - client = self.cluster.get_first_impalad().service.create_beeswax_client() + client = self.cluster.get_first_impalad().service.create_hs2_client() client.set_configuration({'retry_failed_queries': 'true'}) handle = client.execute_async(query) client.wait_for_impala_state(handle, FINISHED, 60) @@ -950,10 +950,11 @@ class TestQueryRetries(CustomClusterTestSuite): try: client.fetch(query, handle) except Exception as e: - assert "Client session expired" in str(e) + assert "Invalid or unknown query handle: {}".format(query_id) in str(e) # Assert that the impalad metrics show one expired session. - assert impalad_service.get_metric_value('impala-server.num-sessions-expired') == 1 + # hs2_client opens new session on each execute_async(), so there should be 2. + assert impalad_service.get_metric_value('impala-server.num-sessions-expired') == 2 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -1218,13 +1219,13 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite): order by o_orderdate """ - def setup_method(self, method): + def setup_method(self, method): # noqa: U100 # Don't call the superclass method to prevent starting Impala before each test. In # this class, each test is responsible for doing that because we want to generate # the parameter string to start-impala-cluster in each test method. pass - def teardown_method(self, method): + def teardown_method(self, method): # noqa: U100 self.clear_tmp_dirs() def __generate_scratch_dir(self, num): @@ -1267,7 +1268,7 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite): expected_count=1) coord_impalad = self.cluster.get_first_impalad() - client = coord_impalad.service.create_beeswax_client() + client = coord_impalad.service.create_hs2_client() disk_failure_impalad = self.cluster.impalads[1] assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT diff --git a/tests/custom_cluster/test_refresh_invalid_partition.py b/tests/custom_cluster/test_refresh_invalid_partition.py index 57b26fded..131a6f181 100644 --- a/tests/custom_cluster/test_refresh_invalid_partition.py +++ b/tests/custom_cluster/test_refresh_invalid_partition.py @@ -29,7 +29,7 @@ class TestRefreshInvalidPartition(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( catalogd_args="--topic_update_log_gc_frequency=10") - def test_refresh_invalid_partition_with_sync_ddl(self, vector, unique_database): + def test_refresh_invalid_partition_with_sync_ddl(self, unique_database): """ Regression test for IMPALA-12448. Avoid getting stuck when refreshing a non-existent partition with sync_ddl. @@ -67,8 +67,8 @@ class TestRefreshInvalidPartition(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--statestore_update_frequency_ms=5000") def test_refresh_missing_partition(self, unique_database): - client1 = self.cluster.impalads[1].service.create_beeswax_client() - client2 = self.cluster.impalads[2].service.create_beeswax_client() + client1 = self.cluster.impalads[1].service.create_hs2_client() + client2 = self.cluster.impalads[2].service.create_hs2_client() self.client.execute('create table {}.tbl (i int) partitioned by (p int)' .format(unique_database)) self.execute_query( diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index 8f962d3c5..5ae9232e4 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -34,9 +34,9 @@ from time import sleep from impala.error import HiveServer2Error from TCLIService import TCLIService -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.impala_connection import ERROR, RUNNING +from tests.common.impala_connection import ( + ERROR, FINISHED, IMPALA_CONNECTION_EXCEPTION, RUNNING) from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfFS from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session @@ -80,7 +80,7 @@ class TestRestart(CustomClusterTestSuite): self._start_impala_cluster([], num_coordinators=1, cluster_size=3) assert len(self.cluster.impalads) == 3 - client = self.cluster.impalads[0].service.create_beeswax_client() + client = self.cluster.impalads[0].service.create_hs2_client() assert client is not None for i in range(5): @@ -107,7 +107,7 @@ class TestRestart(CustomClusterTestSuite): pytest.skip() assert len(self.cluster.impalads) == 3 - client = self.cluster.impalads[0].service.create_beeswax_client() + client = self.cluster.impalads[0].service.create_hs2_client() assert client is not None handle = client.execute_async( @@ -436,11 +436,11 @@ class TestRestart(CustomClusterTestSuite): slow_query = \ "select distinct * from tpch_parquet.lineitem where l_orderkey > sleep(1000)" impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() try: handle = client.execute_async(slow_query) # Make sure query starts running. - self.client.wait_for_impala_state(handle, RUNNING, 1000) + client.wait_for_impala_state(handle, RUNNING, 1000) profile = client.get_runtime_profile(handle) assert "NumBackends: 3" in profile, profile # Restart Statestore and wait till the grace period ends + some buffer. @@ -457,7 +457,7 @@ class TestRestart(CustomClusterTestSuite): try: client.wait_for_finished_timeout(handle, 100) assert False, "Query expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Failed due to unreachable impalad" in str(e), str(e) assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \ self.SUBSCRIBER_TIMEOUT_S, \ @@ -469,7 +469,7 @@ class TestRestart(CustomClusterTestSuite): catalogd_version = self.cluster.catalogd.service.get_catalog_version() impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version) handle = client.execute_async(slow_query) - self.client.wait_for_impala_state(handle, RUNNING, 1000) + client.wait_for_impala_state(handle, RUNNING, 1000) profile = client.get_runtime_profile(handle) assert "NumBackends: 2" in profile, profile start_time = time.time() @@ -480,7 +480,7 @@ class TestRestart(CustomClusterTestSuite): try: client.wait_for_finished_timeout(handle, 100) assert False, "Query expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Failed due to unreachable impalad" in str(e), str(e) assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \ self.SUBSCRIBER_TIMEOUT_S, \ @@ -885,7 +885,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): def expect_beeswax_shutdown_error(fn): try: fn() - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert SHUTDOWN_ERROR_PREFIX in str(e) expect_beeswax_shutdown_error(lambda: self.client.execute("select 1")) expect_beeswax_shutdown_error(lambda: self.client.execute_async("select 1")) @@ -925,8 +925,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): # Make sure that the beeswax query is still executing, then close it to allow the # coordinator to shut down. - self.impalad_test_service.wait_for_query_state(self.client, before_shutdown_handle, - self.client.QUERY_STATES['FINISHED'], timeout=20) + self.client.wait_for_impala_state(before_shutdown_handle, FINISHED, 20) self.client.close_query(before_shutdown_handle) self.cluster.impalads[0].wait_for_exit() @@ -1009,15 +1008,13 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): 'timeout' controls how long we will wait""" # Fix number of scanner threads to make runtime more deterministic. handle = self.execute_query_async(query, {'num_scanner_threads': 1}) - self.impalad_test_service.wait_for_query_state(self.client, handle, - self.client.QUERY_STATES['RUNNING'], timeout=timeout) + self.client.wait_for_impala_state(handle, RUNNING, timeout) return handle def __fetch_and_get_num_backends(self, query, handle, delay_s=0, timeout_s=20): """Fetch the results of 'query' from the beeswax handle 'handle', close the query and return the number of backends obtained from the profile.""" - self.impalad_test_service.wait_for_query_state(self.client, handle, - self.client.QUERY_STATES['FINISHED'], timeout=timeout_s) + self.client.wait_for_impala_state(handle, FINISHED, timeout_s) if delay_s > 0: LOG.info("sleeping for {0}s".format(delay_s)) time.sleep(delay_s) diff --git a/tests/custom_cluster/test_s3a_access.py b/tests/custom_cluster/test_s3a_access.py index 75491a9d8..8364ef856 100644 --- a/tests/custom_cluster/test_s3a_access.py +++ b/tests/custom_cluster/test_s3a_access.py @@ -28,10 +28,12 @@ from tests.util.filesystem_utils import WAREHOUSE tmp = tempfile.NamedTemporaryFile(delete=False) BAD_KEY_FILE = tmp.name + @SkipIf.not_s3 class TestS3AAccess(CustomClusterTestSuite): cmd_filename = "" + @classmethod def setup_class(cls): super(TestS3AAccess, cls).setup_class() @@ -49,7 +51,7 @@ class TestS3AAccess(CustomClusterTestSuite): def _get_impala_client(self): impalad = self.cluster.get_any_impalad() - return impalad.service.create_beeswax_client() + return impalad.service.create_hs2_client() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 05bf1e410..964a082fe 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -91,13 +91,13 @@ class TestScratchDir(CustomClusterTestSuite): os.chmod(dir_path, stat.S_IREAD) return result - def setup_method(self, method): + def setup_method(self, method): # noqa: U100 # Don't call the superclass method to prevent starting Impala before each test. In # this file, each test is responsible for doing that because we want to generate # the parameter string to start-impala-cluster in each test method. pass - def teardown_method(self, method): + def teardown_method(self, method): # noqa: U100 self.clear_tmp_dirs() self.check_deleted_file_fd() @@ -117,7 +117,7 @@ class TestScratchDir(CustomClusterTestSuite): exec_option = vector.get_value('exec_option') exec_option['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, self.spill_query, exec_option) assert self.count_nonempty_dirs(normal_dirs) == 1 @@ -130,7 +130,7 @@ class TestScratchDir(CustomClusterTestSuite): exec_option = vector.get_value('exec_option') exec_option['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() # Expect spill to disk to fail self.execute_query_expect_failure(client, self.spill_query, exec_option) # Should be able to execute in-memory query @@ -159,7 +159,7 @@ class TestScratchDir(CustomClusterTestSuite): # disk. exec_option['spool_query_results'] = '0' impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() # Expect spill to disk to fail self.execute_query_expect_failure(client, self.spill_query, exec_option) # Should be able to execute in-memory query @@ -185,7 +185,7 @@ class TestScratchDir(CustomClusterTestSuite): # disk. exec_option['spool_query_results'] = '0' impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() # Expect spill to disk to fail self.execute_query_expect_failure(client, self.spill_query, exec_option) # Should be able to execute in-memory query @@ -215,7 +215,7 @@ class TestScratchDir(CustomClusterTestSuite): # Should still be able to spill to the third directory. impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_expect_success(client, self.spill_query, exec_option) # Restore second directory mod for cleanup later. for dirpath, dirnames, filenames in os.walk(dirs[1]): @@ -236,7 +236,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs)) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -266,7 +266,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs)) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -335,7 +335,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -366,7 +366,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -400,7 +400,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -433,7 +433,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -472,7 +472,7 @@ class TestScratchDir(CustomClusterTestSuite): handle_name = 'handle' for i in range(num): impalad = self.cluster.impalads[i - 1] - locals()[client_name + str(i)] = impalad.service.create_beeswax_client() + locals()[client_name + str(i)] = impalad.service.create_hs2_client() for i in range(num): client = locals()[client_name + str(i)] @@ -517,7 +517,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() handle = self.execute_query_async_using_client(client, self.spill_query, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -546,7 +546,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_async_using_client(client, self.spill_query_big_table, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) @@ -587,7 +587,7 @@ class TestScratchDir(CustomClusterTestSuite): expected_count=len(normal_dirs) - 1) vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() self.execute_query_async_using_client(client, self.spill_query_big_table, vector) verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) diff --git a/tests/custom_cluster/test_seq_file_filtering.py b/tests/custom_cluster/test_seq_file_filtering.py index fa731b7c5..3eaa43159 100644 --- a/tests/custom_cluster/test_seq_file_filtering.py +++ b/tests/custom_cluster/test_seq_file_filtering.py @@ -21,6 +21,7 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIfBuildType + class TestImpala3798(CustomClusterTestSuite): """Regression test for IMPALA-3798, which is a hang that occurs when an Avro file is not filtered by a runtime filter, but its header split is (this only occurs when the filter @@ -36,9 +37,9 @@ class TestImpala3798(CustomClusterTestSuite): @SkipIfBuildType.not_dev_build @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true") - def test_sequence_file_filtering_race(self, vector): + def test_sequence_file_filtering_race(self): impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.execute("SET RUNTIME_FILTER_MODE=GLOBAL") client.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000") diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py index e76f3457d..cf468891e 100644 --- a/tests/custom_cluster/test_session_expiration.py +++ b/tests/custom_cluster/test_session_expiration.py @@ -24,26 +24,31 @@ import socket import re from time import sleep +from impala.dbapi import connect from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_cluster import DEFAULT_HS2_PORT +from tests.util.thrift_util import op_handle_to_query_id class TestSessionExpiration(CustomClusterTestSuite): """Tests query expiration logic""" + PROFILE_PAGE = "http://localhost:{0}/query_profile?query_id={1}&json" @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_session_timeout=6 " "--idle_client_poll_period_s=0") - def test_session_expiration(self, vector): + def test_session_expiration(self): impalad = self.cluster.get_any_impalad() self.close_impala_clients() num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired") num_connections = impalad.service.get_metric_value( "impala.thrift-server.beeswax-frontend.connections-in-use") - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() + client.execute('select 1') # Sleep for half the expiration time to confirm that the session is not expired early # (see IMPALA-838) sleep(3) + assert client is not None assert num_expired == impalad.service.get_metric_value( "impala-server.num-sessions-expired") # Wait for session expiration. Impala will poll the session expiry queue every second @@ -51,25 +56,25 @@ class TestSessionExpiration(CustomClusterTestSuite): "impala-server.num-sessions-expired", num_expired + 1, 20) # Verify that the idle connection is not closed. assert 1 + num_connections == impalad.service.get_metric_value( - "impala.thrift-server.beeswax-frontend.connections-in-use") + "impala.thrift-server.hiveserver2-frontend.connections-in-use") @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_session_timeout=3 " "--idle_client_poll_period_s=0") - def test_session_expiration_with_set(self, vector): + def test_session_expiration_with_set(self): impalad = self.cluster.get_any_impalad() self.close_impala_clients() num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired") # Test if we can set a shorter timeout than the process-wide option - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.execute("SET IDLE_SESSION_TIMEOUT=1") sleep(2.5) assert num_expired + 1 == impalad.service.get_metric_value( "impala-server.num-sessions-expired") # Test if we can set a longer timeout than the process-wide option - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.execute("SET IDLE_SESSION_TIMEOUT=10") sleep(5) assert num_expired + 1 == impalad.service.get_metric_value( @@ -78,13 +83,13 @@ class TestSessionExpiration(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_session_timeout=5 " "--idle_client_poll_period_s=0") - def test_unsetting_session_expiration(self, vector): + def test_unsetting_session_expiration(self): impalad = self.cluster.get_any_impalad() self.close_impala_clients() num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired") # Test unsetting IDLE_SESSION_TIMEOUT - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.execute("SET IDLE_SESSION_TIMEOUT=1") # Unset to 5 sec @@ -98,34 +103,47 @@ class TestSessionExpiration(CustomClusterTestSuite): assert num_expired + 1 == impalad.service.get_metric_value( "impala-server.num-sessions-expired") + def _get_fast_timeout_cursor_from_hs2_client(self, connection, idle_session_timeout=3): + """Get a fast timing out HiveServer2Cursor from a HiveServer2Connection.""" + cursor = connection.cursor() + # Set disable the trivial query otherwise "select 1" would be admitted as a + # trivial query. + cursor.execute('set enable_trivial_query_for_admission=false') + cursor.execute('set idle_session_timeout={}'.format(idle_session_timeout)) + return cursor + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--default_pool_max_requests=1 " "--idle_client_poll_period_s=0") - def test_session_expiration_with_queued_query(self, vector): + def test_session_expiration_with_queued_query(self): """Ensure that a query waiting in queue gets cancelled if the session expires.""" + # It is currently not possible to run two successive execute_async within single + # session using ImpylaHS2Connection. Therefore, we obtain 2 HiveServer2Cursor from + # HiveServer2Connection instead. impalad = self.cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() - client.execute("SET IDLE_SESSION_TIMEOUT=3") - # Set disable the trivial query otherwise "select 1" would be admitted as a - # trivial query. - client.execute("set enable_trivial_query_for_admission=false") - client.execute_async("select sleep(10000)") - queued_handle = client.execute_async("select 1") - impalad.service.wait_for_metric_value( - "admission-controller.local-num-queued.default-pool", 1) - sleep(3) - impalad.service.wait_for_metric_value( - "admission-controller.local-num-queued.default-pool", 0) - impalad.service.wait_for_metric_value( - "admission-controller.agg-num-running.default-pool", 0) - queued_query_profile = impalad.service.create_beeswax_client().get_runtime_profile( - queued_handle) - assert "Admission result: Cancelled (queued)" in queued_query_profile + with connect(port=impalad.service.hs2_port) as conn: + timeout = 3 + debug_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, timeout) + queued_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, timeout) + debug_cursor.execute_async("select sleep(10000)") + queued_cursor.execute_async("select 1") + impalad.service.wait_for_metric_value( + "admission-controller.local-num-queued.default-pool", 1) + sleep(timeout) + impalad.service.wait_for_metric_value( + "admission-controller.local-num-queued.default-pool", 0) + impalad.service.wait_for_metric_value( + "admission-controller.agg-num-running.default-pool", 0) + queued_query_id = op_handle_to_query_id(queued_cursor._last_operation.handle) + assert queued_query_id is not None + json_summary = self.get_debug_page( + self.PROFILE_PAGE.format(impalad.service.webserver_port, queued_query_id)) + assert "Admission result: Cancelled (queued)" in json_summary['profile'] @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=10 " "--idle_client_poll_period_s=1", cluster_size=1) - def test_closing_idle_connection(self, vector): + def test_closing_idle_connection(self): """ IMPALA-7802: verifies that connections of idle sessions are closed after the sessions have expired.""" impalad = self.cluster.get_any_impalad() @@ -139,10 +157,8 @@ class TestSessionExpiration(CustomClusterTestSuite): # Connect to Impala using either beeswax or HS2 client and verify the number of # opened connections. - if protocol == 'beeswax': - client = impalad.service.create_beeswax_client() - else: - client = impalad.service.create_hs2_client() + client = impalad.service.create_client( + protocol=('hs2' if protocol == 'hiveserver2' else protocol)) client.execute("select 1") impalad.service.wait_for_metric_value(num_connections_metrics_name, num_connections + 1, 20) diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 004ba6768..d4b9c499b 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -20,12 +20,12 @@ import logging import pytest import time -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties from tests.common.impala_cluster import ( DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT) -from tests.common.impala_connection import ERROR, RUNNING +from tests.common.impala_connection import ( + ERROR, IMPALA_CONNECTION_EXCEPTION, RUNNING) from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster from time import sleep @@ -744,7 +744,7 @@ class TestStatestoredHA(CustomClusterTestSuite): slow_query = \ "select distinct * from tpch_parquet.lineitem where l_orderkey > sleep(1000)" impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() try: # Run a slow query handle = client.execute_async(slow_query) @@ -769,7 +769,7 @@ class TestStatestoredHA(CustomClusterTestSuite): try: client.wait_for_finished_timeout(handle, 100) assert False, "Query expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Failed due to unreachable impalad" in str(e), str(e) # Restart original active statestored. Verify that the statestored does not resume diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py index adc8a74c4..681f1929a 100644 --- a/tests/custom_cluster/test_web_pages.py +++ b/tests/custom_cluster/test_web_pages.py @@ -24,10 +24,10 @@ import psutil import pytest import time -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import ( DEFAULT_CLUSTER_SIZE, CustomClusterTestSuite) +from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION from tests.common.skip import SkipIfFS from tests.shell.util import run_impala_shell_cmd @@ -212,7 +212,7 @@ class TestWebPage(CustomClusterTestSuite): statestored_args="--logtostderr=true --redirect_stdout_stderr=false", catalogd_args="--logtostderr=true --redirect_stdout_stderr=false" ) - def test_webserver_hide_logs_link(self, vector): + def test_webserver_hide_logs_link(self): """Validate that there is no /logs link when we use --logtostderr=true """ ports = ["25000", "25010", "25020"] for port in ports: @@ -375,7 +375,7 @@ class TestWebPage(CustomClusterTestSuite): self.execute_query("refresh functional.alltypes", { "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@100" }) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "RPC recv timed out" in str(e) # In impalad side, the query fails by the above error. However, in catalogd side, # the RPCs are still running. Check the in-flight operations. diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py index 93b815f2d..5c094184c 100644 --- a/tests/metadata/test_compute_stats.py +++ b/tests/metadata/test_compute_stats.py @@ -17,6 +17,7 @@ from __future__ import absolute_import, division, print_function from builtins import range +import os import pytest from hive_metastore.ttypes import ( ColumnStatistics, ColumnStatisticsDesc, ColumnStatisticsData, @@ -32,8 +33,6 @@ from tests.common.test_dimensions import ( create_uncompressed_text_dimension) from CatalogObjects.ttypes import THdfsCompression -import os - IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance() @@ -109,7 +108,7 @@ class TestComputeStats(ImpalaTestSuite): finally: self.cleanup_db("parquet", sync_ddl=0) - def test_compute_stats_compression_codec(self, vector, unique_database): + def test_compute_stats_compression_codec(self, unique_database): """IMPALA-8254: Tests that running compute stats with compression_codec set should not throw an error.""" table = "{0}.codec_tbl".format(unique_database) @@ -122,7 +121,7 @@ class TestComputeStats(ImpalaTestSuite): self.execute_query_expect_success(self.client, "drop stats {0}".format(table)) @SkipIfFS.hive - def test_compute_stats_impala_2201(self, vector, unique_database): + def test_compute_stats_impala_2201(self, unique_database): """IMPALA-2201: Tests that the results of compute incremental stats are properly persisted when the data was loaded from Hive with hive.stats.autogather=true. """ @@ -193,11 +192,11 @@ class TestComputeStats(ImpalaTestSuite): # not zero, for all scans. for i in range(len(explain_result.data)): if ("SCAN HDFS" in explain_result.data[i]): - assert(hdfs_physical_properties_template in explain_result.data[i + 1]) - assert("cardinality=0" not in explain_result.data[i + 2]) + assert hdfs_physical_properties_template in explain_result.data[i + 1] + assert "cardinality=0" not in explain_result.data[i + 2] @SkipIfFS.hive - def test_corrupted_stats_in_partitioned_hive_tables(self, vector, unique_database): + def test_corrupted_stats_in_partitioned_hive_tables(self, unique_database): """IMPALA-9744: Tests that the partition stats corruption in Hive tables (row count=0, partition size>0, persisted when the data was loaded with hive.stats.autogather=true) is handled at the table scan level. @@ -240,7 +239,7 @@ class TestComputeStats(ImpalaTestSuite): table_name, 2, 2) @SkipIfFS.hive - def test_corrupted_stats_in_unpartitioned_hive_tables(self, vector, unique_database): + def test_corrupted_stats_in_unpartitioned_hive_tables(self, unique_database): """IMPALA-9744: Tests that the stats corruption in unpartitioned Hive tables (row count=0, partition size>0, persisted when the data was loaded with hive.stats.autogather=true) is handled at the table scan level. @@ -280,13 +279,13 @@ class TestComputeStats(ImpalaTestSuite): table_name, 1, 1) @SkipIfCatalogV2.stats_pulling_disabled() - def test_pull_stats_profile(self, vector, unique_database): + def test_pull_stats_profile(self, unique_database): """Checks that the frontend profile includes metrics when computing incremental statistics. """ try: impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() create = "create table test like functional.alltypes" load = "insert into test partition(year, month) select * from functional.alltypes" insert = """insert into test partition(year=2009, month=1) values @@ -329,6 +328,7 @@ class TestComputeStats(ImpalaTestSuite): finally: client.close() + # Tests compute stats on HBase tables. This test is separate from TestComputeStats, # because we want to use the existing machanism to disable running tests on hbase/none # based on the filesystem type (S3, Isilon, etc.). @@ -391,7 +391,7 @@ class TestIncompatibleColStats(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) - def test_incompatible_col_stats(self, vector, unique_database): + def test_incompatible_col_stats(self, unique_database): """Tests Impala is able to use tables when the column stats data is not compatible with the column type. Regression test for IMPALA-588.""" diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 068e41922..551312a29 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -580,7 +580,7 @@ class TestDdlStatements(TestDdlBase): else: num_attempts = 60 for impalad in ImpalaCluster.get_e2e_test_cluster().impalads: - client = impalad.service.create_beeswax_client() + client = impalad.service.create_client_from_vector(vector) try: for attempt in itertools.count(1): assert attempt <= num_attempts, "ran out of attempts" @@ -603,21 +603,20 @@ class TestDdlStatements(TestDdlBase): impala_cluster = ImpalaCluster.get_e2e_test_cluster() impalads = impala_cluster.impalads view_name = "%s.test_describe_view" % unique_database - query_opts = vector.get_value('exec_option') - first_client = impalads[0].service.create_beeswax_client() + first_client = impalads[0].service.create_client_from_vector(vector) try: # Create a view and verify it's visible. self.execute_query_expect_success(first_client, "create view {0} as " "select * from functional.alltypes" - .format(view_name), query_opts) + .format(view_name)) self._verify_describe_view(vector, view_name, "select * from functional.alltypes") # Alter the view and verify the alter is visible. self.execute_query_expect_success(first_client, "alter view {0} as " "select * from functional.alltypesagg" - .format(view_name), query_opts) + .format(view_name)) self._verify_describe_view(vector, view_name, "select * from functional.alltypesagg") finally: diff --git a/tests/query_test/test_rows_availability.py b/tests/query_test/test_rows_availability.py index 357929894..e68f95618 100644 --- a/tests/query_test/test_rows_availability.py +++ b/tests/query_test/test_rows_availability.py @@ -20,8 +20,10 @@ import pytest import re from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_vector import ImpalaTestDimension +from tests.common.impala_connection import FINISHED from tests.util.parse_util import parse_duration_string_ms + class TestRowsAvailability(ImpalaTestSuite): """Tests that the 'Rows available' timeline event is marked only after rows are truly available. We mark the 'Rows available' event once we advance the query @@ -59,7 +61,7 @@ class TestRowsAvailability(ImpalaTestSuite): return vector.get_value('table_format').file_format == 'text' and\ vector.get_value('table_format').compression_codec == 'none' and\ vector.get_value('exec_option')['batch_size'] == 0 and\ - vector.get_value('exec_option')['disable_codegen'] == False and\ + vector.get_value('exec_option')['disable_codegen'] is False and\ vector.get_value('exec_option')['num_nodes'] == 0 @pytest.mark.execute_serially @@ -70,8 +72,7 @@ class TestRowsAvailability(ImpalaTestSuite): query = vector.get_value('query') # Execute async to get a handle. Wait until the query has completed. handle = self.execute_query_async(query, vector.get_value('exec_option')) - self.impalad_test_service.wait_for_query_state(self.client, handle, - self.client.QUERY_STATES['FINISHED'], timeout=20) + self.client.wait_for_impala_state(handle, FINISHED, 20) profile = self.client.get_runtime_profile(handle) start_time_ms = None diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 31bcb5873..e3e3ea548 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -49,7 +49,7 @@ class TestUdfBase(ImpalaTestSuite): def _run_query_all_impalads(self, exec_options, query, expected): impala_cluster = ImpalaCluster.get_e2e_test_cluster() for impalad in impala_cluster.impalads: - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() result = self.execute_query_expect_success(client, query, exec_options) assert result.data == expected, impalad @@ -508,19 +508,18 @@ class TestUdfTargeted(TestUdfBase): cluster = ImpalaCluster.get_e2e_test_cluster() impalad = cluster.get_any_impalad() - client = impalad.service.create_beeswax_client() + client = impalad.service.create_client_from_vector(vector) # Create and drop functions with sync_ddl to make sure they are reflected # in every impalad. - exec_option = copy(vector.get_value('exec_option')) - exec_option['sync_ddl'] = 1 + client.set_configuration_option('sync_ddl', 1) - self.execute_query_expect_success(client, drop_fn_stmt, exec_option) - self.execute_query_expect_success(client, create_fn_stmt, exec_option) + self.execute_query_expect_success(client, drop_fn_stmt) + self.execute_query_expect_success(client, create_fn_stmt) # Delete the udf jar check_call(["hadoop", "fs", "-rm", jar_path]) different_impalad = cluster.get_different_impalad(impalad) - client = different_impalad.service.create_beeswax_client() + client = different_impalad.service.create_client_from_vector(vector) # Run a query using the udf from an impalad other than the one # we used to create the function. This is to bypass loading from # the cache diff --git a/tests/stress/query_retries_stress_runner.py b/tests/stress/query_retries_stress_runner.py index 840bf2e92..cd89e0cc1 100755 --- a/tests/stress/query_retries_stress_runner.py +++ b/tests/stress/query_retries_stress_runner.py @@ -125,7 +125,7 @@ def run_concurrent_workloads(concurrency, coordinator, database, queries): client = None try: # Create and setup the client. - client = coordinator.service.create_beeswax_client() + client = coordinator.service.create_hs2_client() LOG.info("Running workload: database={0} and coordinator=localhost:{1}, pid={2}" .format(database, coordinator.get_webserver_port(), coordinator.get_pid())) client.execute("use {0}".format(database)) @@ -347,9 +347,9 @@ When specifying a non-default scale, the job will look for a database of the for sys.exit(1) # Set the correct database. - if table_format is 'parquet': + if table_format == 'parquet': database = workload + scale + '_parquet' - elif workload is 'text': + elif workload == 'text': database = workload + scale else: parser.print_usage() diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py index 4f98299ca..dbcba7627 100644 --- a/tests/util/cancel_util.py +++ b/tests/util/cancel_util.py @@ -120,7 +120,7 @@ def __run_cancel_query_and_validate_state(client, query, exec_option, thread.start() sleep(cancel_delay) - if client.get_state(handle) == client.QUERY_STATES['EXCEPTION']: + if client.is_error(handle): # If some error occurred before trying to cancel the query then we put an error # message together and fail the test. thread.join()
