This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 71feb617e4139e2e3430f99cdabb7b63877a2825 Author: Riza Suminto <[email protected]> AuthorDate: Wed Mar 5 12:03:30 2025 -0800 IMPALA-13835: Remove reference to protocol-specific states With IMPALA-13682 merged, checking for query state can be done via wait_for_impala_state(), wait_for_any_impala_state() and other helper methods of ImpalaConnection. This patch remove all reference to protocol-specific states such as BeeswaxService.QueryState. Also fix flake8 errors and unused variable in modified test files. Testing: - Run and pass all affected tests. Change-Id: Id6b56024fbfcea1ff005c34cd146d16e67cb6fa1 Reviewed-on: http://gerrit.cloudera.org:8080/22586 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/custom_cluster/test_blacklist.py | 10 ++--- tests/custom_cluster/test_catalogd_ha.py | 5 +-- tests/custom_cluster/test_events_custom_configs.py | 13 +++--- tests/custom_cluster/test_executor_groups.py | 6 +-- tests/custom_cluster/test_kudu.py | 5 ++- tests/custom_cluster/test_process_failures.py | 4 +- tests/custom_cluster/test_query_live.py | 11 +++--- tests/custom_cluster/test_query_log.py | 9 +++-- tests/custom_cluster/test_query_retries.py | 37 ++++++++--------- tests/custom_cluster/test_restart_services.py | 12 +++--- tests/custom_cluster/test_result_spooling.py | 3 +- tests/custom_cluster/test_statestored_ha.py | 14 +++---- tests/metadata/test_ddl.py | 46 +++++++--------------- tests/metadata/test_load.py | 20 +++++----- tests/query_test/test_cancellation.py | 4 +- tests/query_test/test_fetch.py | 13 +++--- tests/query_test/test_result_spooling.py | 39 +++++++++--------- tests/webserver/test_web_pages.py | 33 +++++++++------- 18 files changed, 136 insertions(+), 148 deletions(-) diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py index e237d6e8c..9891bcd4e 100644 --- a/tests/custom_cluster/test_blacklist.py +++ b/tests/custom_cluster/test_blacklist.py @@ -22,7 +22,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite import pytest import re -from beeswaxd.BeeswaxService import QueryState +from tests.common.impala_connection import FINISHED, RUNNING from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfBuildType from tests.common.test_dimensions import add_mandatory_exec_option from time import sleep @@ -53,7 +53,7 @@ class TestBlacklist(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( statestored_args="-statestore_heartbeat_frequency_ms=1000") - def test_kill_impalad(self, cursor): + def test_kill_impalad(self): """Test that verifies that when an impalad is killed, it is properly blacklisted.""" # Run a query and verify that no impalads are blacklisted yet. result = self.execute_query("select count(*) from tpch.lineitem") @@ -101,7 +101,7 @@ class TestBlacklist(CustomClusterTestSuite): assert re.search("NumBackends: 2", result.runtime_profile), result.runtime_profile @pytest.mark.execute_serially - def test_restart_impalad(self, cursor): + def test_restart_impalad(self): """Test that verifies the behavior when an impalad is killed, blacklisted, and then restarted.""" # Run a query and verify that no impalads are blacklisted yet. @@ -146,7 +146,7 @@ class TestBlacklist(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, statestored_args="-statestore_heartbeat_frequency_ms=1000") - def test_kill_impalad_with_running_queries(self, cursor): + def test_kill_impalad_with_running_queries(self): """Verifies that when an Impala executor is killed while running a query, that the Coordinator blacklists the killed executor.""" @@ -158,7 +158,7 @@ class TestBlacklist(CustomClusterTestSuite): 'debug_action': '0:GETNEXT:DELAY|1:GETNEXT:DELAY'}) # Wait for the query to start running - self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10) + self.client.wait_for_any_impala_state(handle, [RUNNING, FINISHED], 10) # Kill one of the Impala executors killed_impalad = self.cluster.impalads[2] diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 8a0463f14..689e12c47 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -22,10 +22,10 @@ import re import requests import time -from beeswaxd.BeeswaxService import QueryState from builtins import round from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout +from tests.common.impala_connection import ERROR from tests.util.filesystem_utils import IS_S3, get_fs_path from time import sleep @@ -485,8 +485,7 @@ class TestCatalogdHA(CustomClusterTestSuite): LOG.info("Catalogd failover took %s seconds to complete" % round(elapsed_s, 1)) # Verify that the query is failed due to the Catalogd HA fail-over. - self.wait_for_state( - handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, client=client) + client.wait_for_impala_state(handle, ERROR, SYNC_DDL_DELAY_S * 2 + 10) finally: client.close() diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index c99e6384b..fe728a747 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -23,11 +23,11 @@ from os import getenv from time import sleep -from beeswaxd.BeeswaxService import QueryState from hive_metastore.ttypes import FireEventRequest from hive_metastore.ttypes import FireEventRequestData from hive_metastore.ttypes import InsertEventRequestData from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import ERROR, FINISHED from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIf, SkipIfFS from tests.common.test_dimensions import add_exec_option_dimension @@ -655,8 +655,7 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): add_part_stmt = "alter table {} add if not exists partition(p=0)".format(tbl) drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl) refresh_stmt = "refresh {} partition(p=0)".format(tbl) - end_states = [self.client.QUERY_STATES['FINISHED'], - self.client.QUERY_STATES['EXCEPTION']] + end_states = [FINISHED, ERROR] self.execute_query(create_stmt) self.execute_query(add_part_stmt) @@ -665,11 +664,11 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): # Before IMPALA-12855, REFRESH usually fails in 2-3 rounds. for i in range(100): self.execute_query(drop_part_stmt) - refresh_state = self.wait_for_any_state(refresh_handle, end_states, 10) - assert refresh_state == self.client.QUERY_STATES['FINISHED'],\ + refresh_state = self.client.wait_for_any_impala_state( + refresh_handle, end_states, 10) + assert refresh_state == FINISHED, \ "REFRESH state: {}. Error log: {}".format( - QueryState._VALUES_TO_NAMES[refresh_state], - self.client.get_log(refresh_handle)) + refresh_state, self.client.get_log(refresh_handle)) self.execute_query(add_part_stmt) refresh_handle = self.client.execute_async(refresh_stmt) diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 3b4fe2768..965aab1ae 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -20,10 +20,10 @@ from __future__ import absolute_import, division, print_function from builtins import range from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import FINISHED, RUNNING from tests.common.parametrize import UniqueDatabase from tests.common.test_result_verifier import error_msg_startswith from tests.util.concurrent_workload import ConcurrentWorkload - import json import logging import os @@ -1606,7 +1606,7 @@ class TestExecutorGroups(CustomClusterTestSuite): Assert existence of expected_group_str in query profile.""" client.set_configuration(config_options) query_handle = client.execute_async(query) - self.wait_for_state(query_handle, client.QUERY_STATES['RUNNING'], 30, client=client) + client.wait_for_impala_state(query_handle, RUNNING, 30) assert expected_group_str in client.get_runtime_profile(query_handle) @pytest.mark.execute_serially @@ -1699,7 +1699,7 @@ class TestExecutorGroups(CustomClusterTestSuite): self._add_executors("group", healthy_threshold, num_executors=1, resource_pool="root.large", extra_args="-mem_limit=2g", expected_num_impalads=6) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + self.client.wait_for_impala_state(handle, FINISHED, 60) profile = self.client.get_runtime_profile(handle) assert "F00:PLAN FRAGMENT [RANDOM] hosts=4 instances=4" in profile, profile assert ("Scheduler Warning: Cluster membership might changed between planning and " diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py index 6825c3303..7a7920cca 100644 --- a/tests/custom_cluster/test_kudu.py +++ b/tests/custom_cluster/test_kudu.py @@ -24,6 +24,7 @@ from kudu.schema import INT32 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import FINISHED from tests.common.kudu_test_suite import KuduTestSuite from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option @@ -853,8 +854,8 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite): slow_handle = self.execute_query_async(slow_query, slow_sleep) try: # Wait for both queries to finish. - self.wait_for_state(fast_handle, self.client.QUERY_STATES['FINISHED'], timeout) - self.wait_for_state(slow_handle, self.client.QUERY_STATES['FINISHED'], timeout) + self.client.wait_for_impala_state(fast_handle, FINISHED, timeout) + self.client.wait_for_impala_state(slow_handle, FINISHED, timeout) self._check_errors(self.client.get_runtime_profile(slow_handle), expect_error_on_slow_query, error_message, num_row_erros) finally: diff --git a/tests/custom_cluster/test_process_failures.py b/tests/custom_cluster/test_process_failures.py index 39aa34ad3..4221425c3 100644 --- a/tests/custom_cluster/test_process_failures.py +++ b/tests/custom_cluster/test_process_failures.py @@ -19,10 +19,10 @@ from __future__ import absolute_import, division, print_function from builtins import range import pytest -from beeswaxd.BeeswaxService import QueryState from tests.common.custom_cluster_test_suite import ( DEFAULT_CLUSTER_SIZE, CustomClusterTestSuite) +from tests.common.impala_connection import RUNNING from tests.common.test_result_verifier import error_msg_startswith # The exact query doesn't matter much for these tests, just want a query that touches @@ -83,7 +83,7 @@ class TestProcessFailures(CustomClusterTestSuite): # Wait for the queries to start running for handle in handles: - self.wait_for_state(handle, QueryState.RUNNING, 1000, client=client) + client.wait_for_impala_state(handle, RUNNING, 1000) # Kill the coordinator impalad.kill() diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index e12254a27..9492d187b 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -19,12 +19,11 @@ from __future__ import absolute_import, division, print_function import re -from beeswaxd.BeeswaxService import QueryState from getpass import getuser from signal import SIGRTMIN from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_cluster import DEFAULT_KRPC_PORT -from tests.util.retry import retry +from tests.common.impala_connection import FINISHED, PENDING from tests.util.workload_management import assert_query, redaction_rules_file from time import sleep @@ -378,7 +377,7 @@ class TestQueryLive(CustomClusterTestSuite): 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'}) # Wait for query to compile and assign ranges, then kill impalad during debug delay. - self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + self.client.wait_for_impala_state(handle, PENDING, 3) self.cluster.impalads[1].kill() result = self.client.fetch(query, handle) @@ -401,7 +400,7 @@ class TestQueryLive(CustomClusterTestSuite): 'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'}) # Wait for query to compile. - self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + self.client.wait_for_impala_state(handle, PENDING, 3) # Ensure enough time for scheduling to assign ranges. sleep(1) # Kill impalad during debug delay. @@ -426,10 +425,10 @@ class TestQueryLive(CustomClusterTestSuite): 'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'}) # Wait for query to compile and assign ranges, then gracefully shutdown impalad. - self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + self.client.wait_for_impala_state(handle, PENDING, 3) self.cluster.impalads[1].kill(SIGRTMIN) - self.wait_for_any_state(handle, [QueryState.FINISHED], 10) + self.client.wait_for_impala_state(handle, FINISHED, 10) # Allow time for statestore update to propagate. Shutdown grace period is 120s. sleep(1) # Coordinator in graceful shutdown should not be scheduled in new queries. diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index 1d780e7ec..f7fcf67de 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -31,6 +31,7 @@ from thrift.transport.TTransport import TBufferedTransport from thrift.protocol import TBinaryProtocol from tests.common.cluster_config import impalad_admission_ctrl_config_args from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import FINISHED from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT from tests.common.test_vector import ImpalaTestDimension from tests.util.retry import retry @@ -518,7 +519,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): def close_op(client, resp): close_operation_req = TCLIService.TCloseOperationReq() close_operation_req.operationHandle = resp.operationHandle - assert_resp(hs2_client.CloseOperation(close_operation_req)) + assert_resp(client.CloseOperation(close_operation_req)) try: # Open a new HS2 session. @@ -897,7 +898,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): unix_now = time() try: client.execute("{0}".format(unix_now)) - except Exception as _: + except Exception: pass # Get the query id from the completed queries table since the call to execute errors @@ -1178,7 +1179,7 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite): queries. """ resp = self.hs2_client.execute_async("SELECT * FROM functional.alltypes LIMIT 5") - self.wait_for_state(resp, 'FINISHED_STATE', 60, client=self.hs2_client) + self.hs2_client.wait_for_impala_state(resp, FINISHED, 60) self.hs2_client.close_query(resp) # Start a query that will consume the only available slot since its rows are not @@ -1212,7 +1213,7 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite): "list of in-flight queries".format(self.insert_query_id) # Retrieve all rows of the original blocking query to cause it to complete. - self.wait_for_state(long_query_resp, 'FINISHED_STATE', 60, client=self.hs2_client) + self.hs2_client.wait_for_impala_state(long_query_resp, FINISHED, 60) self.hs2_client.close_query(long_query_resp) # Helper function that checks if a query matches the workload management insert DML diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 300532cfa..eb47bf86a 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -31,6 +31,7 @@ from random import randint from RuntimeProfile.ttypes import TRuntimeProfileFormat from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_connection import ERROR, FINISHED, RUNNING from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.errors import Timeout @@ -88,7 +89,7 @@ class TestQueryRetries(CustomClusterTestSuite): return 'functional-query' @pytest.mark.execute_serially - def test_retries_from_cancellation_pool(self, cursor): + def test_retries_from_cancellation_pool(self): """Tests that queries are retried instead of cancelled if one of the nodes leaves the cluster. The retries are triggered by the cancellation pool in the ImpalaServer. The cancellation pool listens for updates from the statestore and kills all queries that @@ -102,7 +103,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Launch the query, wait for it to start running, and then kill an impalad. handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60) + self.client.wait_for_impala_state(handle, RUNNING, 60) # Kill a random impalad (but not the one executing the actual query). self.__kill_random_impalad() @@ -145,7 +146,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Launch a query, it should be retried. handle = self.execute_query_async(self._shuffle_heavy_query, query_options={'retry_failed_queries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60) + self.client.wait_for_impala_state(handle, RUNNING, 60) # Kill a random impalad. killed_impalad = self.__kill_random_impalad() @@ -211,8 +212,8 @@ class TestQueryRetries(CustomClusterTestSuite): handles.append(handle) # Wait for each query to start running. - running_state = self.client.QUERY_STATES['RUNNING'] - list(map(lambda handle: self.wait_for_state(handle, running_state, 60), handles)) + list(map(lambda handle: self.client.wait_for_impala_state(handle, RUNNING, 60), + handles)) # Kill a random impalad. killed_impalad = self.__kill_random_impalad() @@ -259,7 +260,7 @@ class TestQueryRetries(CustomClusterTestSuite): query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + self.client.wait_for_impala_state(handle, FINISHED, 60) # Validate that the query was retried. self.__validate_runtime_profiles_from_service(impalad_service, handle) @@ -320,7 +321,7 @@ class TestQueryRetries(CustomClusterTestSuite): handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80) + self.client.wait_for_impala_state(handle, FINISHED, 80) # Validate that the query was retried. self.__validate_runtime_profiles_from_service(impalad_service, handle) @@ -384,7 +385,7 @@ class TestQueryRetries(CustomClusterTestSuite): query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) # Wait until the query fails. - self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140) + self.client.wait_for_impala_state(handle, ERROR, 140) # Validate the live exec summary. retried_query_id = self.__get_retried_query_id_from_summary(handle) @@ -439,7 +440,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Launch a query, it should be retried. handle = self.execute_query_async(self._shuffle_heavy_query, query_options={'retry_failed_queries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60) + self.client.wait_for_impala_state(handle, RUNNING, 60) # Kill one impalad so that a retry is triggered. killed_impalad = self.cluster.impalads[1] @@ -452,7 +453,7 @@ class TestQueryRetries(CustomClusterTestSuite): self.cluster.impalads[2].kill() # Wait until the query fails. - self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60) + self.client.wait_for_impala_state(handle, ERROR, 60) # Validate the live exec summary. retried_query_id = self.__get_retried_query_id_from_summary(handle) @@ -497,7 +498,7 @@ class TestQueryRetries(CustomClusterTestSuite): query_options = {'retry_failed_queries': 'true', 'batch_size': '1', 'spool_query_results': 'false'} handle = self.execute_query_async(query, query_options) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + self.client.wait_for_impala_state(handle, FINISHED, 60) self.client.fetch(query, handle, max_rows=1) @@ -648,11 +649,11 @@ class TestQueryRetries(CustomClusterTestSuite): def test_original_query_cancel(self): """Test canceling a retryable query with spool_all_results_for_retries=true. Make sure Coordinator::Wait() won't block in cancellation.""" - for state in ['RUNNING', 'FINISHED']: + for state in [RUNNING, FINISHED]: handle = self.execute_query_async(self._union_query, query_options={ 'retry_failed_queries': 'true', 'spool_query_results': 'true', 'spool_all_results_for_retries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES[state], 60) + self.client.wait_for_impala_state(handle, state, 60) # Cancel the query. self.client.cancel(handle) @@ -677,7 +678,7 @@ class TestQueryRetries(CustomClusterTestSuite): query_options = {'retry_failed_queries': 'true', 'batch_size': '1', 'spool_query_results': 'false'} handle = self.execute_query_async(query, query_options) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + self.client.wait_for_impala_state(handle, FINISHED, 60) self.__kill_random_impalad() time.sleep(5) @@ -706,7 +707,7 @@ class TestQueryRetries(CustomClusterTestSuite): query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + self.client.wait_for_impala_state(handle, FINISHED, 60) # Validate the live exec summary. retried_query_id = self.__get_retried_query_id_from_summary(handle) @@ -887,7 +888,7 @@ class TestQueryRetries(CustomClusterTestSuite): query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'exec_time_limit_s': '1'}) - self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60) + self.client.wait_for_impala_state(handle, ERROR, 60) # Validate the live exec summary. retried_query_id = self.__get_retried_query_id_from_summary(handle) @@ -927,7 +928,7 @@ class TestQueryRetries(CustomClusterTestSuite): client = self.cluster.get_first_impalad().service.create_beeswax_client() client.set_configuration({'retry_failed_queries': 'true'}) handle = client.execute_async(query) - self.wait_for_state(handle, client.QUERY_STATES['FINISHED'], 60, client=client) + client.wait_for_impala_state(handle, FINISHED, 60) # Wait for the idle session timeout to expire the session. time.sleep(5) @@ -957,7 +958,7 @@ class TestQueryRetries(CustomClusterTestSuite): self.hs2_client.set_configuration({'retry_failed_queries': 'true'}) self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024') handle = self.hs2_client.execute_async(query) - self.wait_for_state(handle, 'FINISHED_STATE', 60, client=self.hs2_client) + self.hs2_client.wait_for_impala_state(handle, FINISHED, 60) results = self.hs2_client.fetch(query, handle) assert results.success diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index 41f3c8643..8f962d3c5 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -34,9 +34,9 @@ from time import sleep from impala.error import HiveServer2Error from TCLIService import TCLIService -from beeswaxd.BeeswaxService import QueryState from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import ERROR, RUNNING from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfFS from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session @@ -115,7 +115,7 @@ class TestRestart(CustomClusterTestSuite): node_to_restart = self.cluster.impalads[2] node_to_restart.restart() # Verify that the query is cancelled due to the failed impalad quickly. - self.wait_for_state(handle, QueryState.EXCEPTION, 20, client=client) + client.wait_for_impala_state(handle, ERROR, 20) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -440,14 +440,14 @@ class TestRestart(CustomClusterTestSuite): try: handle = client.execute_async(slow_query) # Make sure query starts running. - self.wait_for_state(handle, QueryState.RUNNING, 1000) + self.client.wait_for_impala_state(handle, RUNNING, 1000) profile = client.get_runtime_profile(handle) assert "NumBackends: 3" in profile, profile # Restart Statestore and wait till the grace period ends + some buffer. self.cluster.statestored.restart() self.cluster.statestored.service.wait_for_live_subscribers(4) sleep(self.CANCELLATION_GRACE_PERIOD_S + 1) - assert client.get_state(handle) == QueryState.RUNNING + assert client.is_running(handle) # Now restart statestore and kill a backend while it is down, and make sure the # query fails when it comes back up. start_time = time.time() @@ -469,7 +469,7 @@ class TestRestart(CustomClusterTestSuite): catalogd_version = self.cluster.catalogd.service.get_catalog_version() impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version) handle = client.execute_async(slow_query) - self.wait_for_state(handle, QueryState.RUNNING, 1000) + self.client.wait_for_impala_state(handle, RUNNING, 1000) profile = client.get_runtime_profile(handle) assert "NumBackends: 2" in profile, profile start_time = time.time() @@ -1010,7 +1010,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): # Fix number of scanner threads to make runtime more deterministic. handle = self.execute_query_async(query, {'num_scanner_threads': 1}) self.impalad_test_service.wait_for_query_state(self.client, handle, - self.client.QUERY_STATES['RUNNING'], timeout=20) + self.client.QUERY_STATES['RUNNING'], timeout=timeout) return handle def __fetch_and_get_num_backends(self, query, handle, delay_s=0, timeout_s=20): diff --git a/tests/custom_cluster/test_result_spooling.py b/tests/custom_cluster/test_result_spooling.py index 795c38c5a..4c5a945e4 100644 --- a/tests/custom_cluster/test_result_spooling.py +++ b/tests/custom_cluster/test_result_spooling.py @@ -20,6 +20,7 @@ import pytest import re from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_connection import FINISHED from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) @@ -63,7 +64,7 @@ class TestDedicatedCoordinator(CustomClusterTestSuite): try: # Wait for the query to finish (all rows are spooled). Assert that the executor # has been shutdown and its memory has been released. - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], timeout) + self.client.wait_for_impala_state(handle, FINISHED, timeout) self.assert_eventually(timeout, 0.5, lambda: re.search("RowsSent:.*({0})".format(num_rows), self.client.get_runtime_profile(handle))) diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 6cea78644..004ba6768 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -20,12 +20,12 @@ import logging import pytest import time -from beeswaxd.BeeswaxService import QueryState from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties from tests.common.impala_cluster import ( DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT) +from tests.common.impala_connection import ERROR, RUNNING from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster from time import sleep @@ -749,7 +749,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # Run a slow query handle = client.execute_async(slow_query) # Make sure query starts running. - self.wait_for_state(handle, QueryState.RUNNING, 120, client) + client.wait_for_impala_state(handle, RUNNING, 120) profile = client.get_runtime_profile(handle) assert "NumBackends: 3" in profile, profile # Kill active statestored @@ -763,8 +763,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # Wait till the grace period ends + some buffer to verify the slow query is still # running. sleep(self.RECOVERY_GRACE_PERIOD_S + 1) - assert client.get_state(handle) == QueryState.RUNNING, \ - "Query expected to be in running state" + assert client.is_running(handle), "Query expected to be in running state" # Now kill a backend, and make sure the query fails. self.cluster.impalads[2].kill() try: @@ -785,15 +784,14 @@ class TestStatestoredHA(CustomClusterTestSuite): # Run a slow query handle = client.execute_async(slow_query) # Make sure query starts running. - self.wait_for_state(handle, QueryState.RUNNING, 120, client) + client.wait_for_impala_state(handle, RUNNING, 120) profile = client.get_runtime_profile(handle) assert "NumBackends: 2" in profile, profile # Kill current active statestored start_time = time.time() statestoreds[1].kill() # Wait till the standby statestored becomes active. - query_state = client.get_state(handle) - assert query_state == QueryState.RUNNING + assert client.is_running(handle) statestore_service_0.wait_for_metric_value( "statestore.active-status", expected_value=True, timeout=120) assert (statestore_service_0.get_metric_value("statestore.active-status")), \ @@ -804,7 +802,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # query to fail. Combine failover time (SS_PEER_TIMEOUT_S) and recovery grace # period (RECOVERY_GRACE_PERIOD_S) to avoid flaky test. timeout_s = self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S * 2 - self.wait_for_state(handle, QueryState.EXCEPTION, timeout_s, client) + client.wait_for_impala_state(handle, ERROR, timeout_s) client.close_query(handle) elapsed_s = time.time() - start_time assert elapsed_s >= self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S, \ diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index c6e64ab22..068e41922 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -23,12 +23,12 @@ import pytest import re import time -from beeswaxd.BeeswaxService import QueryState from copy import deepcopy from tests.metadata.test_ddl_base import TestDdlBase from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.environ import (HIVE_MAJOR_VERSION) from tests.common.file_utils import create_table_from_orc +from tests.common.impala_connection import FINISHED, INITIALIZED, PENDING, RUNNING from tests.common.impala_test_suite import LOG from tests.common.parametrize import UniqueDatabase from tests.common.skip import ( @@ -961,8 +961,7 @@ class TestAsyncDDL(TestDdlBase): "'Inserted 7300 row(s)'") @classmethod - def test_get_operation_status_for_client(self, client, unique_database, - init_state, pending_state, running_state): + def test_get_operation_status_for_client(self, client, unique_database): # Setup client.execute("drop table if exists {0}.alltypes_clone".format(unique_database)) client.execute("select count(*) from functional_parquet.alltypes") @@ -986,15 +985,15 @@ class TestAsyncDDL(TestDdlBase): num_times_in_running_state = 0 while not client.state_is_finished(handle): - state = client.get_state(handle) + state = client.get_impala_exec_state(handle) - if (state == init_state): + if (state == INITIALIZED): num_times_in_initialized_state += 1 - if (state == pending_state): + if (state == PENDING): num_times_in_pending_state += 1 - if (state == running_state): + if (state == RUNNING): num_times_in_running_state += 1 # The query must reach INITIALIZED_STATE 0 time and PENDING_STATE at least @@ -1007,18 +1006,8 @@ class TestAsyncDDL(TestDdlBase): def test_get_operation_status_for_async_ddl(self, vector, unique_database): """Tests that for an asynchronously executed DDL with delay, GetOperationStatus must be issued repeatedly. Test client hs2-http, hs2 and beeswax""" - - if vector.get_value('protocol') == 'hs2-http': - self.test_get_operation_status_for_client(self.hs2_http_client, unique_database, - "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE") - - if vector.get_value('protocol') == 'hs2': - self.test_get_operation_status_for_client(self.hs2_client, unique_database, - "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE") - - if vector.get_value('protocol') == 'beeswax': - self.test_get_operation_status_for_client(self.client, unique_database, - QueryState.INITIALIZED, QueryState.COMPILED, QueryState.RUNNING) + client = self.default_impala_client(vector.get_value('protocol')) + self.test_get_operation_status_for_client(client, unique_database) class TestAsyncDDLTiming(TestDdlBase): @@ -1038,10 +1027,6 @@ class TestAsyncDDLTiming(TestDdlBase): def test_alter_table_recover(self, vector, unique_database): enable_async_ddl = vector.get_value('enable_async_ddl_execution') client = self.create_impala_client(protocol=vector.get_value('protocol')) - is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http'] - pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED - running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING - finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED try: # Setup for the alter table case (create table that points to an existing @@ -1066,15 +1051,15 @@ class TestAsyncDDLTiming(TestDdlBase): handle = self.execute_query_async_using_client(client, alter_stmt, new_vector) exec_end = time.time() exec_time = exec_end - exec_start - state = client.get_state(handle) + state = client.get_impala_exec_state(handle) if enable_async_ddl: - assert state == pending_state or state == running_state + assert state in [PENDING, RUNNING] else: - assert state == running_state or state == finished_state + assert state in [RUNNING, FINISHED] # Wait for the statement to finish with a timeout of 20 seconds wait_start = time.time() - self.wait_for_state(handle, finished_state, 20, client=client) + client.wait_for_impala_state(handle, FINISHED, 20) wait_end = time.time() wait_time = wait_end - wait_start self.close_query_using_client(client, handle) @@ -1096,9 +1081,6 @@ class TestAsyncDDLTiming(TestDdlBase): def test_ctas(self, vector, unique_database): enable_async_ddl = vector.get_value('enable_async_ddl_execution') client = self.create_impala_client(protocol=vector.get_value('protocol')) - is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http'] - pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED - finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED try: # The CTAS is going to need the metadata of the source table in the @@ -1125,7 +1107,7 @@ class TestAsyncDDLTiming(TestDdlBase): # The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING # to RUNNING, so the sync case should be in PENDING state at the end of # the execute call. This means that the sync and async cases are the same. - assert client.get_state(handle) == pending_state + assert client.is_pending(handle) # Wait for the statement to finish with a timeout of 40 seconds # (60 seconds without shortcircuit reads). There are other tests running @@ -1134,7 +1116,7 @@ class TestAsyncDDLTiming(TestDdlBase): # on the statement finishing in a particular amount of time. wait_time = 40 if IS_HDFS else 60 wait_start = time.time() - self.wait_for_state(handle, finished_state, wait_time, client=client) + client.wait_for_impala_state(handle, FINISHED, wait_time) wait_end = time.time() wait_time = wait_end - wait_start self.close_query_using_client(client, handle) diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py index 272f8bd0a..d78dce08b 100644 --- a/tests/metadata/test_load.py +++ b/tests/metadata/test_load.py @@ -20,8 +20,8 @@ from __future__ import absolute_import, division, print_function from builtins import range import time -from beeswaxd.BeeswaxService import QueryState from copy import deepcopy +from tests.common.impala_connection import FINISHED, RUNNING from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import ( create_client_protocol_dimension, @@ -43,6 +43,7 @@ HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH), # A path outside WAREHOUSE, which will be a different bucket for Ozone/ofs. TMP_STAGING_PATH = get_fs_path('/tmp/test_load_staging') + @SkipIfLocal.hdfs_client class TestLoadData(ImpalaTestSuite): @@ -143,7 +144,7 @@ class TestLoadDataExternal(ImpalaTestSuite): self.client.execute("create table functional.{0} like functional.alltypesnopart" " location '{1}/{0}'".format(TEST_TBL_NOPART_EXT, WAREHOUSE)) - def test_load(self, vector): + def test_load(self): self.execute_query_expect_success(self.client, "load data inpath '{0}/100101.txt'" " into table functional.{1}".format(TMP_STAGING_PATH, TEST_TBL_NOPART_EXT)) result = self.execute_scalar( @@ -179,9 +180,6 @@ class TestAsyncLoadData(ImpalaTestSuite): enable_async_load_data = vector.get_value('enable_async_load_data_execution') protocol = vector.get_value('protocol') client = self.create_impala_client(protocol=protocol) - is_hs2 = protocol in ['hs2', 'hs2-http'] - running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING - finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED # Form a fully qualified table name with '-' in protocol 'hs2-http' dropped as # '-' is not allowed in Impala table name even delimited with ``. @@ -221,13 +219,13 @@ class TestAsyncLoadData(ImpalaTestSuite): handle = self.execute_query_async_using_client(client, load_stmt, new_vector) exec_end = time.time() exec_time = exec_end - exec_start - exec_end_state = client.get_state(handle) + exec_end_state = client.get_impala_exec_state(handle) # Wait for the statement to finish with a timeout of 20 seconds # (30 seconds without shortcircuit reads) wait_time = 20 if IS_HDFS else 30 wait_start = time.time() - self.wait_for_state(handle, finished_state, wait_time, client=client) + client.wait_for_impala_state(handle, FINISHED, wait_time) wait_end = time.time() wait_time = wait_end - wait_start self.close_query_using_client(client, handle) @@ -236,8 +234,8 @@ class TestAsyncLoadData(ImpalaTestSuite): # The compilation of LOAD is processed in the exec step without delay. And the # processing of the LOAD plan is in wait step with delay. The wait time should # definitely take more time than 3 seconds. - assert(exec_end_state == running_state) - assert(wait_time >= 3) + assert (exec_end_state == RUNNING) + assert (wait_time >= 3) else: # In sync mode: # The entire LOAD is processed in the exec step with delay. exec_time should be @@ -245,8 +243,8 @@ class TestAsyncLoadData(ImpalaTestSuite): # that the exec state returned is still in RUNNING state due to the the wait-for # thread executing ClientRequestState::Wait() does not have time to set the # exec state from RUNNING to FINISH. - assert(exec_end_state == running_state or exec_end_state == finished_state) - assert(exec_time >= 3) + assert (exec_end_state == RUNNING or exec_end_state == FINISHED) + assert (exec_time >= 3) finally: client.close() diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index 49efa053f..6c6570d97 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -200,14 +200,14 @@ class TestCancellation(ImpalaTestSuite): # Wait for the query to start (with a long timeout to account for admission control # queuing). WAIT_SECONDS = 60 * 30 - assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(0.1) + assert any(client.is_running(handle) or sleep(0.1) for _ in range(10 * WAIT_SECONDS)), 'Query failed to start' client.cancel(handle) # Wait up to 5 seconds for the query to get cancelled # TODO(IMPALA-1262): This should be CANCELED_STATE # TODO(IMPALA-8411): Remove and assert that the query is cancelled immediately - assert any(client.get_state(handle) == 'ERROR_STATE' or sleep(1) + assert any(client.is_error(handle) or sleep(1) for _ in range(5)), 'Query failed to cancel' # Get profile and check for formatting errors profile = client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py index c70e5158f..49a4ed2c7 100644 --- a/tests/query_test/test_fetch.py +++ b/tests/query_test/test_fetch.py @@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function import re from time import sleep +from tests.common.impala_connection import FINISHED from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import extend_exec_option_dimension from tests.util.parse_util import parse_duration_string_ms, \ @@ -49,7 +50,7 @@ class TestFetch(ImpalaTestSuite): handle = self.execute_query_async(query, vector.get_value('exec_option')) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30) + self.client.wait_for_impala_state(handle, FINISHED, 30) # Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s. sleep(2.5) # Fetch the results so that the fetch related counters are updated. @@ -93,7 +94,7 @@ class TestFetch(ImpalaTestSuite): handle = self.execute_query_async(query, vector.get_value('exec_option')) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30) + self.client.wait_for_impala_state(handle, FINISHED, 30) # This loop will do 6 fetches that contain data and a final fetch with # no data. The last fetch is after eos has been set, so it does not count. @@ -128,7 +129,7 @@ class TestFetch(ImpalaTestSuite): handle = self.execute_query_async(query, vector.get_value('exec_option')) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30) + self.client.wait_for_impala_state(handle, FINISHED, 30) # This loop will do 5 fetches for a total of 25 rows. This is incomplete. for i in range(5): @@ -175,15 +176,15 @@ class TestFetchAndSpooling(ImpalaTestSuite): """Validate that RowsSent and RowsSentRate are set to valid values in the PLAN_ROOT_SINK section of the runtime profile.""" num_rows = 10 - if ('spool_query_results' in vector.get_value('exec_option') and - vector.get_value('exec_option')['spool_query_results'] == 'false'): + if ('spool_query_results' in vector.get_value('exec_option') + and vector.get_value('exec_option')['spool_query_results'] == 'false'): vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_ROWS:SLEEP@1000" else: vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_BATCH:SLEEP@1000" result = self.execute_query("select id from functional.alltypes limit {0}" .format(num_rows), vector.get_value('exec_option')) assert "RowsSent: {0} ({0})".format(num_rows) in result.runtime_profile - rows_sent_rate = re.search("RowsSentRate: (\d*\.?\d*)", result.runtime_profile) + rows_sent_rate = re.search(r"RowsSentRate: (\d*\.?\d*)", result.runtime_profile) assert rows_sent_rate assert float(rows_sent_rate.group(1)) > 0 diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py index 9883cd94f..834d2f600 100644 --- a/tests/query_test/test_result_spooling.py +++ b/tests/query_test/test_result_spooling.py @@ -23,6 +23,7 @@ import threading from time import sleep from tests.common.errors import Timeout +from tests.common.impala_connection import FINISHED from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension @@ -82,11 +83,12 @@ class TestResultSpooling(ImpalaTestSuite): # Regexes to look for in the runtime profiles. # PeakUnpinnedBytes can show up in exec nodes as well, so we only look for the # PeakUnpinnedBytes metrics in the PLAN_ROOT_SINK section of the profile. - unpinned_bytes_regex = "PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)" + unpinned_bytes_regex = r"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)" # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string. spilled_exec_option_regex = "ExecOption:.*Spilled" # PLAN_ROOT_SINK's reservation limit should be set at MAX_RESULT_SPOOLING_MEM = 32 KB. - plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 KB" + plan_root_sink_reservation_limit = (r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: " + r"32.00 KB") # Fetch the runtime profile every 0.5 seconds until either the timeout is hit, or # PeakUnpinnedBytes shows up in the profile. @@ -145,7 +147,7 @@ class TestResultSpooling(ImpalaTestSuite): timeout = 10 # Regexes to look for in the runtime profile. send_wait_time_regex = "RowBatchSendWaitTime: [1-9]" - queue_spilled_regex = "PLAN_ROOT_SINK[\s\S]*?ExecOption: Spilled" + queue_spilled_regex = r"PLAN_ROOT_SINK[\s\S]*?ExecOption: Spilled" # Execute the query asynchronously, wait for the queue to fill up, start fetching # results, and then validate that RowBatchSendWaitTime shows a non-zero value in the @@ -154,7 +156,7 @@ class TestResultSpooling(ImpalaTestSuite): # is guaranteed to be full. handle = self.execute_query_async(query, exec_options) try: - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], timeout) + self.client.wait_for_impala_state(handle, FINISHED, timeout) self.assert_eventually(30, 1, lambda: re.search(queue_spilled_regex, self.client.get_runtime_profile(handle))) # A fetch request is necessary to unblock the producer thread and trigger an update @@ -186,7 +188,7 @@ class TestResultSpooling(ImpalaTestSuite): thread = threading.Thread(target=lambda: self.create_impala_client().fetch(query, handle)) thread.start() - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 10) + self.client.wait_for_impala_state(handle, FINISHED, 10) thread.join() assert re.search(get_wait_time_regex, self.client.get_runtime_profile(handle)) finally: @@ -264,8 +266,8 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite): # Result spooling should be independent of file format, so only testing for # table_format=parquet/none in order to avoid a test dimension explosion. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet' and - v.get_value('table_format').compression_codec == 'none') + v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'none') @classmethod def setup_class(cls): @@ -295,7 +297,7 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite): # If 'wait_for_finished' is True, wait for the query to reach the FINISHED state. # When it reaches this state all results should be successfully spooled. if vector.get_value('wait_for_finished'): - self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], timeout) + self.client.wait_for_impala_state(handle, FINISHED, timeout) rows_fetched = 0 # Call 'fetch' on the query handle enough times to read all rows. @@ -307,7 +309,7 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite): rows_fetched += len(result_data) results.extend(result_data) finally: - self.client.close_query(handle) + self.client.close_query(handle) # Assert that the fetched results match the '_base_data'. assert self._num_rows == rows_fetched @@ -346,8 +348,8 @@ class TestResultSpoolingCancellation(ImpalaTestSuite): # Result spooling should be independent of file format, so only testing for # table_format=parquet/none in order to avoid a test dimension explosion. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet' and - v.get_value('table_format').compression_codec == 'none') + v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'none') def test_cancellation(self, vector): vector.get_value('exec_option')['spool_query_results'] = 'true' @@ -406,8 +408,8 @@ class TestResultSpoolingFailpoints(ImpalaTestSuite): # Result spooling should be independent of file format, so only testing for # table_format=parquet/none in order to avoid a test dimension explosion. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet' and - v.get_value('table_format').compression_codec == 'none') + v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'none') def test_failpoints(self, vector): vector.get_value('exec_option')['batch_size'] = 10 @@ -439,8 +441,8 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite): # Result spooling should be independent of file format, so only testing for # table_format=parquet/none in order to avoid a test dimension explosion. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet' and - v.get_value('table_format').compression_codec == 'none') + v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'none') def test_high_max_row_size(self, vector): """Test that when MAX_ROW_SIZE is set, PLAN_ROOT_SINK can adjust its max_reservation @@ -464,7 +466,8 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite): assert re.search(spilled_exec_option_regex, result.runtime_profile) # PLAN_ROOT_SINK's reservation limit should be set at 2 * MAX_ROW_SIZE. - plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 MB" + plan_root_sink_reservation_limit = (r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: " + r"32.00 MB") assert re.search(plan_root_sink_reservation_limit, result.runtime_profile) def test_high_default_spillable_buffer(self, vector): @@ -509,7 +512,7 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite): # Check that PLAN_ROOT_SINK's reservation limit match the default # MAX_RESULT_SPOOLING_MEM. - plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \ + plan_root_sink_reservation_limit = r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \ .format('100.00 MB') assert re.search(plan_root_sink_reservation_limit, result.runtime_profile) @@ -526,6 +529,6 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite): assert re.search(spilled_exec_option_regex, result.runtime_profile) # Check that PLAN_ROOT_SINK's reservation limit match. - plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \ + plan_root_sink_reservation_limit = r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \ .format(expected_limit) assert re.search(plan_root_sink_reservation_limit, result.runtime_profile) diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index cac4f407a..be4dcf49c 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -20,6 +20,7 @@ from tests.common.environ import ImpalaTestClusterFlagsDetector from tests.common.file_utils import grep_dir from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster from tests.common.impala_cluster import ImpalaCluster +from tests.common.impala_connection import FINISHED, RUNNING from tests.common.impala_test_suite import ImpalaTestSuite from tests.util.filesystem_utils import supports_storage_ids from tests.util.parse_util import parse_duration_string_ms @@ -146,7 +147,7 @@ class TestWebPage(ImpalaTestSuite): query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" query_handle = self.client.execute_async(query) try: - self.wait_for_state(query_handle, self.client.QUERY_STATES['RUNNING'], 1000) + self.client.wait_for_impala_state(query_handle, RUNNING, 1000) memz_breakdown = self.get_debug_page(self.MEMZ_URL)['detailed'] finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}") assert finstance_re.search(memz_breakdown), memz_breakdown @@ -171,8 +172,8 @@ class TestWebPage(ImpalaTestSuite): assert "application/json" == response.headers['Content-Type'] jmx_json = "" try: - jmx_json = json.loads(response.text) - assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json + jmx_json = json.loads(response.text) + assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json except ValueError: assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json @@ -454,8 +455,8 @@ class TestWebPage(ImpalaTestSuite): def __test_table_metrics(self, db_name, tbl_name, metric): self.client.execute("refresh %s.%s" % (db_name, tbl_name)) - self.get_and_check_status(self.TABLE_METRICS_URL + - "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) + self.get_and_check_status(self.TABLE_METRICS_URL + + "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) def __test_catalog_tables_loading_time(self, db_name, tbl_name): """Test the list of tables with the longest loading time in the catalog page. @@ -491,7 +492,7 @@ class TestWebPage(ImpalaTestSuite): # Find the entry for the db table and verify its file count. if re.search(target_metric, trow) is not None: # Get the number following <td> in the entry - nfiles = re.search('(?<=\<td\>)\d+', trow) + nfiles = re.search(r'(?<=\<td\>)\d+', trow) assert nfiles.group(0) == numfiles response = self.get_and_check_status(self.CATALOG_URL + "?json", "high_file_count_tables", ports_to_test=self.CATALOG_TEST_PORT) @@ -515,7 +516,7 @@ class TestWebPage(ImpalaTestSuite): expected_result = "select \"{0}...".format("x " * 121) check_if_contains = False response_json = self.__run_query_and_get_debug_page( - query, self.QUERIES_URL, expected_state=self.client.QUERY_STATES["FINISHED"]) + query, self.QUERIES_URL, expected_state=FINISHED) # Search the json for the expected value. # The query can be in in_flight_queries even though it is in FINISHED state. for json_part in itertools.chain( @@ -539,7 +540,7 @@ class TestWebPage(ImpalaTestSuite): response_json = "" try: if expected_state: - self.wait_for_state(query_handle, expected_state, 100) + self.client.wait_for_impala_state(query_handle, expected_state, 100) responses = self.get_and_check_status( page_url + "?query_id=%s&json" % query_handle.get_handle().id, ports_to_test=[25000]) @@ -556,7 +557,7 @@ class TestWebPage(ImpalaTestSuite): queries; nothing for DDL statements""" sleep_query = "select sleep(10000) from functional.alltypes limit 1" ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query) - running_state = self.client.QUERY_STATES['RUNNING'] + running_state = RUNNING backend_state_properties = ['cpu_user_s', 'rpc_latency', 'num_remaining_instances', 'num_instances', 'peak_per_host_mem_consumption', 'time_since_last_heard_from', 'status', 'host', @@ -587,7 +588,7 @@ class TestWebPage(ImpalaTestSuite): nothing for DDL statements""" sleep_query = "select sleep(10000) from functional.alltypes limit 1" ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, sleep_query) - running_state = self.client.QUERY_STATES['RUNNING'] + running_state = RUNNING instance_stats_properties = ['fragment_name', 'time_since_last_heard_from', 'current_state', 'first_status_update_received', 'instance_id', 'done'] @@ -595,6 +596,7 @@ class TestWebPage(ImpalaTestSuite): for query in [sleep_query, ctas_sleep_query]: response_json = self.__run_query_and_get_debug_page(query, self.QUERY_FINSTANCES_URL, + query_options=query_options, expected_state=running_state) assert 'backend_instances' in response_json @@ -613,7 +615,8 @@ class TestWebPage(ImpalaTestSuite): assert not instance_stats['done'] response_json = self.__run_query_and_get_debug_page("describe functional.alltypes", - self.QUERY_BACKENDS_URL) + self.QUERY_BACKENDS_URL, + query_options=query_options) assert 'backend_instances' not in response_json @pytest.mark.xfail(run=False, reason="IMPALA-8059") @@ -900,7 +903,7 @@ class TestWebPage(ImpalaTestSuite): # Matches all 'form' tags that are not followed by including the hidden inputs. form_regex = "<form [^{]*?>(?!{{>www/form-hidden-inputs.tmpl}})" # Matches XMLHttpRequest.open() in javascript that are not followed with make_url(). - javascript_regex = "open\(['\"]GET['\"], (?!make_url)" + javascript_regex = r"open\(['\"]GET['\"], (?!make_url)" # Matches urls in json parameters passed to DataTables. datatables_regex = "url: ['\"](?!make_url)" # Matches all references of paths that contain '/www/' but are not fully qualified. @@ -911,7 +914,8 @@ class TestWebPage(ImpalaTestSuite): regex = "(%s)|(%s)|(%s)|(%s)|(%s)|(%s)|(%s)" % \ (href_regex, script_regex, form_regex, javascript_regex, datatables_regex, path_regex, link_regex) - results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex, ".*\.tmpl") + results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex, + r".*\.tmpl") assert len(results) == 0, \ "All links on the webui must include the webserver host: %s" % results @@ -1037,7 +1041,7 @@ class TestWebPage(ImpalaTestSuite): """Tests that /queries page shows query progress.""" query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" response_json = self.__run_query_and_get_debug_page( - query, self.QUERIES_URL, expected_state=self.client.QUERY_STATES["RUNNING"]) + query, self.QUERIES_URL, expected_state=RUNNING) for json_part in response_json['in_flight_queries']: if query in json_part['stmt']: assert json_part["query_progress"] == "0 / 4 ( 0%)" @@ -1175,6 +1179,7 @@ class TestWebPage(ImpalaTestSuite): # check if response size is 2 , for both catalog and impalad webUI assert len(responses) == 2 + class TestWebPageAndCloseSession(ImpalaTestSuite): ROOT_URL = "http://localhost:{0}/"
