This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 71feb617e4139e2e3430f99cdabb7b63877a2825
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Mar 5 12:03:30 2025 -0800

    IMPALA-13835: Remove reference to protocol-specific states
    
    With IMPALA-13682 merged, checking for query state can be done via
    wait_for_impala_state(), wait_for_any_impala_state() and other helper
    methods of ImpalaConnection. This patch remove all reference to
    protocol-specific states such as BeeswaxService.QueryState.
    
    Also fix flake8 errors and unused variable in modified test files.
    
    Testing:
    - Run and pass all affected tests.
    
    Change-Id: Id6b56024fbfcea1ff005c34cd146d16e67cb6fa1
    Reviewed-on: http://gerrit.cloudera.org:8080/22586
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_blacklist.py             | 10 ++---
 tests/custom_cluster/test_catalogd_ha.py           |  5 +--
 tests/custom_cluster/test_events_custom_configs.py | 13 +++---
 tests/custom_cluster/test_executor_groups.py       |  6 +--
 tests/custom_cluster/test_kudu.py                  |  5 ++-
 tests/custom_cluster/test_process_failures.py      |  4 +-
 tests/custom_cluster/test_query_live.py            | 11 +++---
 tests/custom_cluster/test_query_log.py             |  9 +++--
 tests/custom_cluster/test_query_retries.py         | 37 ++++++++---------
 tests/custom_cluster/test_restart_services.py      | 12 +++---
 tests/custom_cluster/test_result_spooling.py       |  3 +-
 tests/custom_cluster/test_statestored_ha.py        | 14 +++----
 tests/metadata/test_ddl.py                         | 46 +++++++---------------
 tests/metadata/test_load.py                        | 20 +++++-----
 tests/query_test/test_cancellation.py              |  4 +-
 tests/query_test/test_fetch.py                     | 13 +++---
 tests/query_test/test_result_spooling.py           | 39 +++++++++---------
 tests/webserver/test_web_pages.py                  | 33 +++++++++-------
 18 files changed, 136 insertions(+), 148 deletions(-)

diff --git a/tests/custom_cluster/test_blacklist.py 
b/tests/custom_cluster/test_blacklist.py
index e237d6e8c..9891bcd4e 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -22,7 +22,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 import pytest
 import re
 
-from beeswaxd.BeeswaxService import QueryState
+from tests.common.impala_connection import FINISHED, RUNNING
 from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfBuildType
 from tests.common.test_dimensions import add_mandatory_exec_option
 from time import sleep
@@ -53,7 +53,7 @@ class TestBlacklist(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       statestored_args="-statestore_heartbeat_frequency_ms=1000")
-  def test_kill_impalad(self, cursor):
+  def test_kill_impalad(self):
     """Test that verifies that when an impalad is killed, it is properly 
blacklisted."""
     # Run a query and verify that no impalads are blacklisted yet.
     result = self.execute_query("select count(*) from tpch.lineitem")
@@ -101,7 +101,7 @@ class TestBlacklist(CustomClusterTestSuite):
     assert re.search("NumBackends: 2", result.runtime_profile), 
result.runtime_profile
 
   @pytest.mark.execute_serially
-  def test_restart_impalad(self, cursor):
+  def test_restart_impalad(self):
     """Test that verifies the behavior when an impalad is killed, blacklisted, 
and then
     restarted."""
     # Run a query and verify that no impalads are blacklisted yet.
@@ -146,7 +146,7 @@ class TestBlacklist(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1,
       statestored_args="-statestore_heartbeat_frequency_ms=1000")
-  def test_kill_impalad_with_running_queries(self, cursor):
+  def test_kill_impalad_with_running_queries(self):
     """Verifies that when an Impala executor is killed while running a query, 
that the
     Coordinator blacklists the killed executor."""
 
@@ -158,7 +158,7 @@ class TestBlacklist(CustomClusterTestSuite):
         'debug_action': '0:GETNEXT:DELAY|1:GETNEXT:DELAY'})
 
     # Wait for the query to start running
-    self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 
10)
+    self.client.wait_for_any_impala_state(handle, [RUNNING, FINISHED], 10)
 
     # Kill one of the Impala executors
     killed_impalad = self.cluster.impalads[2]
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 8a0463f14..689e12c47 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -22,10 +22,10 @@ import re
 import requests
 import time
 
-from beeswaxd.BeeswaxService import QueryState
 from builtins import round
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
+from tests.common.impala_connection import ERROR
 from tests.util.filesystem_utils import IS_S3, get_fs_path
 from time import sleep
 
@@ -485,8 +485,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
       LOG.info("Catalogd failover took %s seconds to complete" % 
round(elapsed_s, 1))
 
       # Verify that the query is failed due to the Catalogd HA fail-over.
-      self.wait_for_state(
-          handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, 
client=client)
+      client.wait_for_impala_state(handle, ERROR, SYNC_DDL_DELAY_S * 2 + 10)
     finally:
       client.close()
 
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index c99e6384b..fe728a747 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -23,11 +23,11 @@ from os import getenv
 from time import sleep
 
 
-from beeswaxd.BeeswaxService import QueryState
 from hive_metastore.ttypes import FireEventRequest
 from hive_metastore.ttypes import FireEventRequestData
 from hive_metastore.ttypes import InsertEventRequestData
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import ERROR, FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIf, SkipIfFS
 from tests.common.test_dimensions import add_exec_option_dimension
@@ -655,8 +655,7 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     add_part_stmt = "alter table {} add if not exists 
partition(p=0)".format(tbl)
     drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl)
     refresh_stmt = "refresh {} partition(p=0)".format(tbl)
-    end_states = [self.client.QUERY_STATES['FINISHED'],
-                  self.client.QUERY_STATES['EXCEPTION']]
+    end_states = [FINISHED, ERROR]
 
     self.execute_query(create_stmt)
     self.execute_query(add_part_stmt)
@@ -665,11 +664,11 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     # Before IMPALA-12855, REFRESH usually fails in 2-3 rounds.
     for i in range(100):
       self.execute_query(drop_part_stmt)
-      refresh_state = self.wait_for_any_state(refresh_handle, end_states, 10)
-      assert refresh_state == self.client.QUERY_STATES['FINISHED'],\
+      refresh_state = self.client.wait_for_any_impala_state(
+        refresh_handle, end_states, 10)
+      assert refresh_state == FINISHED, \
           "REFRESH state: {}. Error log: {}".format(
-              QueryState._VALUES_TO_NAMES[refresh_state],
-              self.client.get_log(refresh_handle))
+            refresh_state, self.client.get_log(refresh_handle))
       self.execute_query(add_part_stmt)
       refresh_handle = self.client.execute_async(refresh_stmt)
 
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index 3b4fe2768..965aab1ae 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -20,10 +20,10 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import FINISHED, RUNNING
 from tests.common.parametrize import UniqueDatabase
 from tests.common.test_result_verifier import error_msg_startswith
 from tests.util.concurrent_workload import ConcurrentWorkload
-
 import json
 import logging
 import os
@@ -1606,7 +1606,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     Assert existence of expected_group_str in query profile."""
     client.set_configuration(config_options)
     query_handle = client.execute_async(query)
-    self.wait_for_state(query_handle, client.QUERY_STATES['RUNNING'], 30, 
client=client)
+    client.wait_for_impala_state(query_handle, RUNNING, 30)
     assert expected_group_str in client.get_runtime_profile(query_handle)
 
   @pytest.mark.execute_serially
@@ -1699,7 +1699,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._add_executors("group", healthy_threshold, num_executors=1,
         resource_pool="root.large", extra_args="-mem_limit=2g", 
expected_num_impalads=6)
 
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    self.client.wait_for_impala_state(handle, FINISHED, 60)
     profile = self.client.get_runtime_profile(handle)
     assert "F00:PLAN FRAGMENT [RANDOM] hosts=4 instances=4" in profile, profile
     assert ("Scheduler Warning: Cluster membership might changed between 
planning and "
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 6825c3303..7a7920cca 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -24,6 +24,7 @@ from kudu.schema import INT32
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import FINISHED
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
 from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option
@@ -853,8 +854,8 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite):
     slow_handle = self.execute_query_async(slow_query, slow_sleep)
     try:
       # Wait for both queries to finish.
-      self.wait_for_state(fast_handle, self.client.QUERY_STATES['FINISHED'], 
timeout)
-      self.wait_for_state(slow_handle, self.client.QUERY_STATES['FINISHED'], 
timeout)
+      self.client.wait_for_impala_state(fast_handle, FINISHED, timeout)
+      self.client.wait_for_impala_state(slow_handle, FINISHED, timeout)
       self._check_errors(self.client.get_runtime_profile(slow_handle),
           expect_error_on_slow_query, error_message, num_row_erros)
     finally:
diff --git a/tests/custom_cluster/test_process_failures.py 
b/tests/custom_cluster/test_process_failures.py
index 39aa34ad3..4221425c3 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -19,10 +19,10 @@ from __future__ import absolute_import, division, 
print_function
 from builtins import range
 import pytest
 
-from beeswaxd.BeeswaxService import QueryState
 from tests.common.custom_cluster_test_suite import (
     DEFAULT_CLUSTER_SIZE,
     CustomClusterTestSuite)
+from tests.common.impala_connection import RUNNING
 from tests.common.test_result_verifier import error_msg_startswith
 
 # The exact query doesn't matter much for these tests, just want a query that 
touches
@@ -83,7 +83,7 @@ class TestProcessFailures(CustomClusterTestSuite):
 
     # Wait for the queries to start running
     for handle in handles:
-      self.wait_for_state(handle, QueryState.RUNNING, 1000, client=client)
+      client.wait_for_impala_state(handle, RUNNING, 1000)
 
     # Kill the coordinator
     impalad.kill()
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index e12254a27..9492d187b 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -19,12 +19,11 @@ from __future__ import absolute_import, division, 
print_function
 
 import re
 
-from beeswaxd.BeeswaxService import QueryState
 from getpass import getuser
 from signal import SIGRTMIN
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import DEFAULT_KRPC_PORT
-from tests.util.retry import retry
+from tests.common.impala_connection import FINISHED, PENDING
 from tests.util.workload_management import assert_query, redaction_rules_file
 from time import sleep
 
@@ -378,7 +377,7 @@ class TestQueryLive(CustomClusterTestSuite):
         'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'})
 
     # Wait for query to compile and assign ranges, then kill impalad during 
debug delay.
-    self.wait_for_any_state(handle, [QueryState.COMPILED], 3)
+    self.client.wait_for_impala_state(handle, PENDING, 3)
     self.cluster.impalads[1].kill()
 
     result = self.client.fetch(query, handle)
@@ -401,7 +400,7 @@ class TestQueryLive(CustomClusterTestSuite):
         'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
 
     # Wait for query to compile.
-    self.wait_for_any_state(handle, [QueryState.COMPILED], 3)
+    self.client.wait_for_impala_state(handle, PENDING, 3)
     # Ensure enough time for scheduling to assign ranges.
     sleep(1)
     # Kill impalad during debug delay.
@@ -426,10 +425,10 @@ class TestQueryLive(CustomClusterTestSuite):
         'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
 
     # Wait for query to compile and assign ranges, then gracefully shutdown 
impalad.
-    self.wait_for_any_state(handle, [QueryState.COMPILED], 3)
+    self.client.wait_for_impala_state(handle, PENDING, 3)
     self.cluster.impalads[1].kill(SIGRTMIN)
 
-    self.wait_for_any_state(handle, [QueryState.FINISHED], 10)
+    self.client.wait_for_impala_state(handle, FINISHED, 10)
     # Allow time for statestore update to propagate. Shutdown grace period is 
120s.
     sleep(1)
     # Coordinator in graceful shutdown should not be scheduled in new queries.
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 1d780e7ec..f7fcf67de 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -31,6 +31,7 @@ from thrift.transport.TTransport import TBufferedTransport
 from thrift.protocol import TBinaryProtocol
 from tests.common.cluster_config import impalad_admission_ctrl_config_args
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import FINISHED
 from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.retry import retry
@@ -518,7 +519,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     def close_op(client, resp):
       close_operation_req = TCLIService.TCloseOperationReq()
       close_operation_req.operationHandle = resp.operationHandle
-      assert_resp(hs2_client.CloseOperation(close_operation_req))
+      assert_resp(client.CloseOperation(close_operation_req))
 
     try:
       # Open a new HS2 session.
@@ -897,7 +898,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     unix_now = time()
     try:
       client.execute("{0}".format(unix_now))
-    except Exception as _:
+    except Exception:
       pass
 
     # Get the query id from the completed queries table since the call to 
execute errors
@@ -1178,7 +1179,7 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite):
               queries.
        """
     resp = self.hs2_client.execute_async("SELECT * FROM functional.alltypes 
LIMIT 5")
-    self.wait_for_state(resp, 'FINISHED_STATE', 60, client=self.hs2_client)
+    self.hs2_client.wait_for_impala_state(resp, FINISHED, 60)
     self.hs2_client.close_query(resp)
 
     # Start a query that will consume the only available slot since its rows 
are not
@@ -1212,7 +1213,7 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite):
         "list of in-flight queries".format(self.insert_query_id)
 
     # Retrieve all rows of the original blocking query to cause it to complete.
-    self.wait_for_state(long_query_resp, 'FINISHED_STATE', 60, 
client=self.hs2_client)
+    self.hs2_client.wait_for_impala_state(long_query_resp, FINISHED, 60)
     self.hs2_client.close_query(long_query_resp)
 
     # Helper function that checks if a query matches the workload management 
insert DML
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 300532cfa..eb47bf86a 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -31,6 +31,7 @@ from random import randint
 
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_connection import ERROR, FINISHED, RUNNING
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.errors import Timeout
@@ -88,7 +89,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     return 'functional-query'
 
   @pytest.mark.execute_serially
-  def test_retries_from_cancellation_pool(self, cursor):
+  def test_retries_from_cancellation_pool(self):
     """Tests that queries are retried instead of cancelled if one of the nodes 
leaves the
     cluster. The retries are triggered by the cancellation pool in the 
ImpalaServer. The
     cancellation pool listens for updates from the statestore and kills all 
queries that
@@ -102,7 +103,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Launch the query, wait for it to start running, and then kill an impalad.
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
+    self.client.wait_for_impala_state(handle, RUNNING, 60)
 
     # Kill a random impalad (but not the one executing the actual query).
     self.__kill_random_impalad()
@@ -145,7 +146,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Launch a query, it should be retried.
     handle = self.execute_query_async(self._shuffle_heavy_query,
         query_options={'retry_failed_queries': 'true'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
+    self.client.wait_for_impala_state(handle, RUNNING, 60)
 
     # Kill a random impalad.
     killed_impalad = self.__kill_random_impalad()
@@ -211,8 +212,8 @@ class TestQueryRetries(CustomClusterTestSuite):
       handles.append(handle)
 
     # Wait for each query to start running.
-    running_state = self.client.QUERY_STATES['RUNNING']
-    list(map(lambda handle: self.wait_for_state(handle, running_state, 60), 
handles))
+    list(map(lambda handle: self.client.wait_for_impala_state(handle, RUNNING, 
60),
+             handles))
 
     # Kill a random impalad.
     killed_impalad = self.__kill_random_impalad()
@@ -259,7 +260,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    self.client.wait_for_impala_state(handle, FINISHED, 60)
 
     # Validate that the query was retried.
     self.__validate_runtime_profiles_from_service(impalad_service, handle)
@@ -320,7 +321,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80)
+    self.client.wait_for_impala_state(handle, FINISHED, 80)
 
     # Validate that the query was retried.
     self.__validate_runtime_profiles_from_service(impalad_service, handle)
@@ -384,7 +385,7 @@ class TestQueryRetries(CustomClusterTestSuite):
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
     # Wait until the query fails.
-    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140)
+    self.client.wait_for_impala_state(handle, ERROR, 140)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -439,7 +440,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Launch a query, it should be retried.
     handle = self.execute_query_async(self._shuffle_heavy_query,
         query_options={'retry_failed_queries': 'true'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
+    self.client.wait_for_impala_state(handle, RUNNING, 60)
 
     # Kill one impalad so that a retry is triggered.
     killed_impalad = self.cluster.impalads[1]
@@ -452,7 +453,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.cluster.impalads[2].kill()
 
     # Wait until the query fails.
-    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
+    self.client.wait_for_impala_state(handle, ERROR, 60)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -497,7 +498,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     query_options = {'retry_failed_queries': 'true', 'batch_size': '1',
         'spool_query_results': 'false'}
     handle = self.execute_query_async(query, query_options)
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    self.client.wait_for_impala_state(handle, FINISHED, 60)
 
     self.client.fetch(query, handle, max_rows=1)
 
@@ -648,11 +649,11 @@ class TestQueryRetries(CustomClusterTestSuite):
   def test_original_query_cancel(self):
     """Test canceling a retryable query with 
spool_all_results_for_retries=true. Make sure
     Coordinator::Wait() won't block in cancellation."""
-    for state in ['RUNNING', 'FINISHED']:
+    for state in [RUNNING, FINISHED]:
       handle = self.execute_query_async(self._union_query, query_options={
         'retry_failed_queries': 'true', 'spool_query_results': 'true',
         'spool_all_results_for_retries': 'true'})
-      self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)
+      self.client.wait_for_impala_state(handle, state, 60)
 
       # Cancel the query.
       self.client.cancel(handle)
@@ -677,7 +678,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     query_options = {'retry_failed_queries': 'true', 'batch_size': '1',
         'spool_query_results': 'false'}
     handle = self.execute_query_async(query, query_options)
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    self.client.wait_for_impala_state(handle, FINISHED, 60)
 
     self.__kill_random_impalad()
     time.sleep(5)
@@ -706,7 +707,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    self.client.wait_for_impala_state(handle, FINISHED, 60)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -887,7 +888,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true', 'exec_time_limit_s': 
'1'})
-    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
+    self.client.wait_for_impala_state(handle, ERROR, 60)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -927,7 +928,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     client = self.cluster.get_first_impalad().service.create_beeswax_client()
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
-    self.wait_for_state(handle, client.QUERY_STATES['FINISHED'], 60, 
client=client)
+    client.wait_for_impala_state(handle, FINISHED, 60)
 
     # Wait for the idle session timeout to expire the session.
     time.sleep(5)
@@ -957,7 +958,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', 
'1024')
     handle = self.hs2_client.execute_async(query)
-    self.wait_for_state(handle, 'FINISHED_STATE', 60, client=self.hs2_client)
+    self.hs2_client.wait_for_impala_state(handle, FINISHED, 60)
 
     results = self.hs2_client.fetch(query, handle)
     assert results.success
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 41f3c8643..8f962d3c5 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -34,9 +34,9 @@ from time import sleep
 from impala.error import HiveServer2Error
 from TCLIService import TCLIService
 
-from beeswaxd.BeeswaxService import QueryState
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import ERROR, RUNNING
 from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfFS
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 
@@ -115,7 +115,7 @@ class TestRestart(CustomClusterTestSuite):
       node_to_restart = self.cluster.impalads[2]
       node_to_restart.restart()
       # Verify that the query is cancelled due to the failed impalad quickly.
-      self.wait_for_state(handle, QueryState.EXCEPTION, 20, client=client)
+      client.wait_for_impala_state(handle, ERROR, 20)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -440,14 +440,14 @@ class TestRestart(CustomClusterTestSuite):
     try:
       handle = client.execute_async(slow_query)
       # Make sure query starts running.
-      self.wait_for_state(handle, QueryState.RUNNING, 1000)
+      self.client.wait_for_impala_state(handle, RUNNING, 1000)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 3" in profile, profile
       # Restart Statestore and wait till the grace period ends + some buffer.
       self.cluster.statestored.restart()
       self.cluster.statestored.service.wait_for_live_subscribers(4)
       sleep(self.CANCELLATION_GRACE_PERIOD_S + 1)
-      assert client.get_state(handle) == QueryState.RUNNING
+      assert client.is_running(handle)
       # Now restart statestore and kill a backend while it is down, and make 
sure the
       # query fails when it comes back up.
       start_time = time.time()
@@ -469,7 +469,7 @@ class TestRestart(CustomClusterTestSuite):
       catalogd_version = self.cluster.catalogd.service.get_catalog_version()
       impalad.service.wait_for_metric_value("catalog.curr-version", 
catalogd_version)
       handle = client.execute_async(slow_query)
-      self.wait_for_state(handle, QueryState.RUNNING, 1000)
+      self.client.wait_for_impala_state(handle, RUNNING, 1000)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 2" in profile, profile
       start_time = time.time()
@@ -1010,7 +1010,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
     # Fix number of scanner threads to make runtime more deterministic.
     handle = self.execute_query_async(query, {'num_scanner_threads': 1})
     self.impalad_test_service.wait_for_query_state(self.client, handle,
-                self.client.QUERY_STATES['RUNNING'], timeout=20)
+                self.client.QUERY_STATES['RUNNING'], timeout=timeout)
     return handle
 
   def __fetch_and_get_num_backends(self, query, handle, delay_s=0, 
timeout_s=20):
diff --git a/tests/custom_cluster/test_result_spooling.py 
b/tests/custom_cluster/test_result_spooling.py
index 795c38c5a..4c5a945e4 100644
--- a/tests/custom_cluster/test_result_spooling.py
+++ b/tests/custom_cluster/test_result_spooling.py
@@ -20,6 +20,7 @@ import pytest
 import re
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import FINISHED
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -63,7 +64,7 @@ class TestDedicatedCoordinator(CustomClusterTestSuite):
     try:
       # Wait for the query to finish (all rows are spooled). Assert that the 
executor
       # has been shutdown and its memory has been released.
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 
timeout)
+      self.client.wait_for_impala_state(handle, FINISHED, timeout)
       self.assert_eventually(timeout, 0.5,
           lambda: re.search("RowsSent:.*({0})".format(num_rows),
           self.client.get_runtime_profile(handle)))
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 6cea78644..004ba6768 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -20,12 +20,12 @@ import logging
 import pytest
 import time
 
-from beeswaxd.BeeswaxService import QueryState
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
 from tests.common.impala_cluster import (
     DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT)
+from tests.common.impala_connection import ERROR, RUNNING
 from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster
 from time import sleep
 
@@ -749,7 +749,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       # Run a slow query
       handle = client.execute_async(slow_query)
       # Make sure query starts running.
-      self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+      client.wait_for_impala_state(handle, RUNNING, 120)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 3" in profile, profile
       # Kill active statestored
@@ -763,8 +763,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       # Wait till the grace period ends + some buffer to verify the slow query 
is still
       # running.
       sleep(self.RECOVERY_GRACE_PERIOD_S + 1)
-      assert client.get_state(handle) == QueryState.RUNNING, \
-          "Query expected to be in running state"
+      assert client.is_running(handle), "Query expected to be in running state"
       # Now kill a backend, and make sure the query fails.
       self.cluster.impalads[2].kill()
       try:
@@ -785,15 +784,14 @@ class TestStatestoredHA(CustomClusterTestSuite):
       # Run a slow query
       handle = client.execute_async(slow_query)
       # Make sure query starts running.
-      self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+      client.wait_for_impala_state(handle, RUNNING, 120)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 2" in profile, profile
       # Kill current active statestored
       start_time = time.time()
       statestoreds[1].kill()
       # Wait till the standby statestored becomes active.
-      query_state = client.get_state(handle)
-      assert query_state == QueryState.RUNNING
+      assert client.is_running(handle)
       statestore_service_0.wait_for_metric_value(
           "statestore.active-status", expected_value=True, timeout=120)
       assert 
(statestore_service_0.get_metric_value("statestore.active-status")), \
@@ -804,7 +802,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       # query to fail. Combine failover time (SS_PEER_TIMEOUT_S) and recovery 
grace
       # period (RECOVERY_GRACE_PERIOD_S) to avoid flaky test.
       timeout_s = self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S * 2
-      self.wait_for_state(handle, QueryState.EXCEPTION, timeout_s, client)
+      client.wait_for_impala_state(handle, ERROR, timeout_s)
       client.close_query(handle)
       elapsed_s = time.time() - start_time
       assert elapsed_s >= self.SS_PEER_TIMEOUT_S + 
self.RECOVERY_GRACE_PERIOD_S, \
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index c6e64ab22..068e41922 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -23,12 +23,12 @@ import pytest
 import re
 import time
 
-from beeswaxd.BeeswaxService import QueryState
 from copy import deepcopy
 from tests.metadata.test_ddl_base import TestDdlBase
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.environ import (HIVE_MAJOR_VERSION)
 from tests.common.file_utils import create_table_from_orc
+from tests.common.impala_connection import FINISHED, INITIALIZED, PENDING, 
RUNNING
 from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import (
@@ -961,8 +961,7 @@ class TestAsyncDDL(TestDdlBase):
         "'Inserted 7300 row(s)'")
 
   @classmethod
-  def test_get_operation_status_for_client(self, client, unique_database,
-          init_state, pending_state, running_state):
+  def test_get_operation_status_for_client(self, client, unique_database):
     # Setup
     client.execute("drop table if exists 
{0}.alltypes_clone".format(unique_database))
     client.execute("select count(*) from functional_parquet.alltypes")
@@ -986,15 +985,15 @@ class TestAsyncDDL(TestDdlBase):
     num_times_in_running_state = 0
     while not client.state_is_finished(handle):
 
-      state = client.get_state(handle)
+      state = client.get_impala_exec_state(handle)
 
-      if (state == init_state):
+      if (state == INITIALIZED):
         num_times_in_initialized_state += 1
 
-      if (state == pending_state):
+      if (state == PENDING):
         num_times_in_pending_state += 1
 
-      if (state == running_state):
+      if (state == RUNNING):
         num_times_in_running_state += 1
 
     # The query must reach INITIALIZED_STATE 0 time and PENDING_STATE at least
@@ -1007,18 +1006,8 @@ class TestAsyncDDL(TestDdlBase):
   def test_get_operation_status_for_async_ddl(self, vector, unique_database):
     """Tests that for an asynchronously executed DDL with delay, 
GetOperationStatus
     must be issued repeatedly. Test client hs2-http, hs2 and beeswax"""
-
-    if vector.get_value('protocol') == 'hs2-http':
-      self.test_get_operation_status_for_client(self.hs2_http_client, 
unique_database,
-      "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE")
-
-    if vector.get_value('protocol') == 'hs2':
-      self.test_get_operation_status_for_client(self.hs2_client, 
unique_database,
-      "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE")
-
-    if vector.get_value('protocol') == 'beeswax':
-      self.test_get_operation_status_for_client(self.client, unique_database,
-      QueryState.INITIALIZED, QueryState.COMPILED, QueryState.RUNNING)
+    client = self.default_impala_client(vector.get_value('protocol'))
+    self.test_get_operation_status_for_client(client, unique_database)
 
 
 class TestAsyncDDLTiming(TestDdlBase):
@@ -1038,10 +1027,6 @@ class TestAsyncDDLTiming(TestDdlBase):
   def test_alter_table_recover(self, vector, unique_database):
     enable_async_ddl = vector.get_value('enable_async_ddl_execution')
     client = self.create_impala_client(protocol=vector.get_value('protocol'))
-    is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http']
-    pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED
-    running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING
-    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
 
     try:
       # Setup for the alter table case (create table that points to an existing
@@ -1066,15 +1051,15 @@ class TestAsyncDDLTiming(TestDdlBase):
       handle = self.execute_query_async_using_client(client, alter_stmt, 
new_vector)
       exec_end = time.time()
       exec_time = exec_end - exec_start
-      state = client.get_state(handle)
+      state = client.get_impala_exec_state(handle)
       if enable_async_ddl:
-        assert state == pending_state or state == running_state
+        assert state in [PENDING, RUNNING]
       else:
-        assert state == running_state or state == finished_state
+        assert state in [RUNNING, FINISHED]
 
       # Wait for the statement to finish with a timeout of 20 seconds
       wait_start = time.time()
-      self.wait_for_state(handle, finished_state, 20, client=client)
+      client.wait_for_impala_state(handle, FINISHED, 20)
       wait_end = time.time()
       wait_time = wait_end - wait_start
       self.close_query_using_client(client, handle)
@@ -1096,9 +1081,6 @@ class TestAsyncDDLTiming(TestDdlBase):
   def test_ctas(self, vector, unique_database):
     enable_async_ddl = vector.get_value('enable_async_ddl_execution')
     client = self.create_impala_client(protocol=vector.get_value('protocol'))
-    is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http']
-    pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED
-    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
 
     try:
       # The CTAS is going to need the metadata of the source table in the
@@ -1125,7 +1107,7 @@ class TestAsyncDDLTiming(TestDdlBase):
       # The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING
       # to RUNNING, so the sync case should be in PENDING state at the end of
       # the execute call. This means that the sync and async cases are the 
same.
-      assert client.get_state(handle) == pending_state
+      assert client.is_pending(handle)
 
       # Wait for the statement to finish with a timeout of 40 seconds
       # (60 seconds without shortcircuit reads). There are other tests running
@@ -1134,7 +1116,7 @@ class TestAsyncDDLTiming(TestDdlBase):
       # on the statement finishing in a particular amount of time.
       wait_time = 40 if IS_HDFS else 60
       wait_start = time.time()
-      self.wait_for_state(handle, finished_state, wait_time, client=client)
+      client.wait_for_impala_state(handle, FINISHED, wait_time)
       wait_end = time.time()
       wait_time = wait_end - wait_start
       self.close_query_using_client(client, handle)
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index 272f8bd0a..d78dce08b 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -20,8 +20,8 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import time
-from beeswaxd.BeeswaxService import QueryState
 from copy import deepcopy
+from tests.common.impala_connection import FINISHED, RUNNING
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
     create_client_protocol_dimension,
@@ -43,6 +43,7 @@ HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH),
 # A path outside WAREHOUSE, which will be a different bucket for Ozone/ofs.
 TMP_STAGING_PATH = get_fs_path('/tmp/test_load_staging')
 
+
 @SkipIfLocal.hdfs_client
 class TestLoadData(ImpalaTestSuite):
 
@@ -143,7 +144,7 @@ class TestLoadDataExternal(ImpalaTestSuite):
     self.client.execute("create table functional.{0} like 
functional.alltypesnopart"
         " location '{1}/{0}'".format(TEST_TBL_NOPART_EXT, WAREHOUSE))
 
-  def test_load(self, vector):
+  def test_load(self):
     self.execute_query_expect_success(self.client, "load data inpath 
'{0}/100101.txt'"
         " into table functional.{1}".format(TMP_STAGING_PATH, 
TEST_TBL_NOPART_EXT))
     result = self.execute_scalar(
@@ -179,9 +180,6 @@ class TestAsyncLoadData(ImpalaTestSuite):
     enable_async_load_data = 
vector.get_value('enable_async_load_data_execution')
     protocol = vector.get_value('protocol')
     client = self.create_impala_client(protocol=protocol)
-    is_hs2 = protocol in ['hs2', 'hs2-http']
-    running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING
-    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
 
     # Form a fully qualified table name with '-' in protocol 'hs2-http' 
dropped as
     # '-' is not allowed in Impala table name even delimited with ``.
@@ -221,13 +219,13 @@ class TestAsyncLoadData(ImpalaTestSuite):
       handle = self.execute_query_async_using_client(client, load_stmt, 
new_vector)
       exec_end = time.time()
       exec_time = exec_end - exec_start
-      exec_end_state = client.get_state(handle)
+      exec_end_state = client.get_impala_exec_state(handle)
 
       # Wait for the statement to finish with a timeout of 20 seconds
       # (30 seconds without shortcircuit reads)
       wait_time = 20 if IS_HDFS else 30
       wait_start = time.time()
-      self.wait_for_state(handle, finished_state, wait_time, client=client)
+      client.wait_for_impala_state(handle, FINISHED, wait_time)
       wait_end = time.time()
       wait_time = wait_end - wait_start
       self.close_query_using_client(client, handle)
@@ -236,8 +234,8 @@ class TestAsyncLoadData(ImpalaTestSuite):
         #  The compilation of LOAD is processed in the exec step without 
delay. And the
         #  processing of the LOAD plan is in wait step with delay. The wait 
time should
         #  definitely take more time than 3 seconds.
-        assert(exec_end_state == running_state)
-        assert(wait_time >= 3)
+        assert (exec_end_state == RUNNING)
+        assert (wait_time >= 3)
       else:
         # In sync mode:
         #  The entire LOAD is processed in the exec step with delay. exec_time 
should be
@@ -245,8 +243,8 @@ class TestAsyncLoadData(ImpalaTestSuite):
         #  that the exec state returned is still in RUNNING state due to the 
the wait-for
         #  thread executing ClientRequestState::Wait() does not have time to 
set the
         #  exec state from RUNNING to FINISH.
-        assert(exec_end_state == running_state or exec_end_state == 
finished_state)
-        assert(exec_time >= 3)
+        assert (exec_end_state == RUNNING or exec_end_state == FINISHED)
+        assert (exec_time >= 3)
     finally:
       client.close()
 
diff --git a/tests/query_test/test_cancellation.py 
b/tests/query_test/test_cancellation.py
index 49efa053f..6c6570d97 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -200,14 +200,14 @@ class TestCancellation(ImpalaTestSuite):
     # Wait for the query to start (with a long timeout to account for 
admission control
     # queuing).
     WAIT_SECONDS = 60 * 30
-    assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(0.1)
+    assert any(client.is_running(handle) or sleep(0.1)
                for _ in range(10 * WAIT_SECONDS)), 'Query failed to start'
 
     client.cancel(handle)
     # Wait up to 5 seconds for the query to get cancelled
     # TODO(IMPALA-1262): This should be CANCELED_STATE
     # TODO(IMPALA-8411): Remove and assert that the query is cancelled 
immediately
-    assert any(client.get_state(handle) == 'ERROR_STATE' or sleep(1)
+    assert any(client.is_error(handle) or sleep(1)
                for _ in range(5)), 'Query failed to cancel'
     # Get profile and check for formatting errors
     profile = client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT)
diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py
index c70e5158f..49a4ed2c7 100644
--- a/tests/query_test/test_fetch.py
+++ b/tests/query_test/test_fetch.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 import re
 
 from time import sleep
+from tests.common.impala_connection import FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import extend_exec_option_dimension
 from tests.util.parse_util import parse_duration_string_ms, \
@@ -49,7 +50,7 @@ class TestFetch(ImpalaTestSuite):
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
+      self.client.wait_for_impala_state(handle, FINISHED, 30)
       # Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s.
       sleep(2.5)
       # Fetch the results so that the fetch related counters are updated.
@@ -93,7 +94,7 @@ class TestFetch(ImpalaTestSuite):
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
+      self.client.wait_for_impala_state(handle, FINISHED, 30)
 
       # This loop will do 6 fetches that contain data and a final fetch with
       # no data. The last fetch is after eos has been set, so it does not 
count.
@@ -128,7 +129,7 @@ class TestFetch(ImpalaTestSuite):
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
+      self.client.wait_for_impala_state(handle, FINISHED, 30)
 
       # This loop will do 5 fetches for a total of 25 rows. This is incomplete.
       for i in range(5):
@@ -175,15 +176,15 @@ class TestFetchAndSpooling(ImpalaTestSuite):
     """Validate that RowsSent and RowsSentRate are set to valid values in
     the PLAN_ROOT_SINK section of the runtime profile."""
     num_rows = 10
-    if ('spool_query_results' in vector.get_value('exec_option') and
-          vector.get_value('exec_option')['spool_query_results'] == 'false'):
+    if ('spool_query_results' in vector.get_value('exec_option')
+        and vector.get_value('exec_option')['spool_query_results'] == 'false'):
       vector.get_value('exec_option')['debug_action'] = 
"BPRS_BEFORE_ADD_ROWS:SLEEP@1000"
     else:
       vector.get_value('exec_option')['debug_action'] = 
"BPRS_BEFORE_ADD_BATCH:SLEEP@1000"
     result = self.execute_query("select id from functional.alltypes limit {0}"
         .format(num_rows), vector.get_value('exec_option'))
     assert "RowsSent: {0} ({0})".format(num_rows) in result.runtime_profile
-    rows_sent_rate = re.search("RowsSentRate: (\d*\.?\d*)", 
result.runtime_profile)
+    rows_sent_rate = re.search(r"RowsSentRate: (\d*\.?\d*)", 
result.runtime_profile)
     assert rows_sent_rate
     assert float(rows_sent_rate.group(1)) > 0
 
diff --git a/tests/query_test/test_result_spooling.py 
b/tests/query_test/test_result_spooling.py
index 9883cd94f..834d2f600 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -23,6 +23,7 @@ import threading
 
 from time import sleep
 from tests.common.errors import Timeout
+from tests.common.impala_connection import FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
@@ -82,11 +83,12 @@ class TestResultSpooling(ImpalaTestSuite):
     # Regexes to look for in the runtime profiles.
     # PeakUnpinnedBytes can show up in exec nodes as well, so we only look for 
the
     # PeakUnpinnedBytes metrics in the PLAN_ROOT_SINK section of the profile.
-    unpinned_bytes_regex = 
"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
+    unpinned_bytes_regex = 
r"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
     # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
     spilled_exec_option_regex = "ExecOption:.*Spilled"
     # PLAN_ROOT_SINK's reservation limit should be set at 
MAX_RESULT_SPOOLING_MEM = 32 KB.
-    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 KB"
+    plan_root_sink_reservation_limit = 
(r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: "
+                                        r"32.00 KB")
 
     # Fetch the runtime profile every 0.5 seconds until either the timeout is 
hit, or
     # PeakUnpinnedBytes shows up in the profile.
@@ -145,7 +147,7 @@ class TestResultSpooling(ImpalaTestSuite):
     timeout = 10
     # Regexes to look for in the runtime profile.
     send_wait_time_regex = "RowBatchSendWaitTime: [1-9]"
-    queue_spilled_regex = "PLAN_ROOT_SINK[\s\S]*?ExecOption: Spilled"
+    queue_spilled_regex = r"PLAN_ROOT_SINK[\s\S]*?ExecOption: Spilled"
 
     # Execute the query asynchronously, wait for the queue to fill up, start 
fetching
     # results, and then validate that RowBatchSendWaitTime shows a non-zero 
value in the
@@ -154,7 +156,7 @@ class TestResultSpooling(ImpalaTestSuite):
     # is guaranteed to be full.
     handle = self.execute_query_async(query, exec_options)
     try:
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 
timeout)
+      self.client.wait_for_impala_state(handle, FINISHED, timeout)
       self.assert_eventually(30, 1, lambda: re.search(queue_spilled_regex,
           self.client.get_runtime_profile(handle)))
       # A fetch request is necessary to unblock the producer thread and 
trigger an update
@@ -186,7 +188,7 @@ class TestResultSpooling(ImpalaTestSuite):
       thread = threading.Thread(target=lambda:
           self.create_impala_client().fetch(query, handle))
       thread.start()
-      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 10)
+      self.client.wait_for_impala_state(handle, FINISHED, 10)
       thread.join()
       assert re.search(get_wait_time_regex, 
self.client.get_runtime_profile(handle))
     finally:
@@ -264,8 +266,8 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite):
     # Result spooling should be independent of file format, so only testing for
     # table_format=parquet/none in order to avoid a test dimension explosion.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet' and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format == 'parquet'
+        and v.get_value('table_format').compression_codec == 'none')
 
   @classmethod
   def setup_class(cls):
@@ -295,7 +297,7 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite):
       # If 'wait_for_finished' is True, wait for the query to reach the 
FINISHED state.
       # When it reaches this state all results should be successfully spooled.
       if vector.get_value('wait_for_finished'):
-          self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 
timeout)
+        self.client.wait_for_impala_state(handle, FINISHED, timeout)
       rows_fetched = 0
 
       # Call 'fetch' on the query handle enough times to read all rows.
@@ -307,7 +309,7 @@ class TestResultSpoolingFetchSize(ImpalaTestSuite):
         rows_fetched += len(result_data)
         results.extend(result_data)
     finally:
-       self.client.close_query(handle)
+      self.client.close_query(handle)
 
     # Assert that the fetched results match the '_base_data'.
     assert self._num_rows == rows_fetched
@@ -346,8 +348,8 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
     # Result spooling should be independent of file format, so only testing for
     # table_format=parquet/none in order to avoid a test dimension explosion.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet' and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format == 'parquet'
+        and v.get_value('table_format').compression_codec == 'none')
 
   def test_cancellation(self, vector):
     vector.get_value('exec_option')['spool_query_results'] = 'true'
@@ -406,8 +408,8 @@ class TestResultSpoolingFailpoints(ImpalaTestSuite):
     # Result spooling should be independent of file format, so only testing for
     # table_format=parquet/none in order to avoid a test dimension explosion.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet' and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format == 'parquet'
+        and v.get_value('table_format').compression_codec == 'none')
 
   def test_failpoints(self, vector):
     vector.get_value('exec_option')['batch_size'] = 10
@@ -439,8 +441,8 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
     # Result spooling should be independent of file format, so only testing for
     # table_format=parquet/none in order to avoid a test dimension explosion.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet' and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format == 'parquet'
+        and v.get_value('table_format').compression_codec == 'none')
 
   def test_high_max_row_size(self, vector):
     """Test that when MAX_ROW_SIZE is set, PLAN_ROOT_SINK can adjust its 
max_reservation
@@ -464,7 +466,8 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
     assert re.search(spilled_exec_option_regex, result.runtime_profile)
 
     # PLAN_ROOT_SINK's reservation limit should be set at 2 * MAX_ROW_SIZE.
-    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 32.00 MB"
+    plan_root_sink_reservation_limit = 
(r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: "
+                                        r"32.00 MB")
     assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
 
   def test_high_default_spillable_buffer(self, vector):
@@ -509,7 +512,7 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
 
     # Check that PLAN_ROOT_SINK's reservation limit match the default
     # MAX_RESULT_SPOOLING_MEM.
-    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+    plan_root_sink_reservation_limit = 
r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
         .format('100.00 MB')
     assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
 
@@ -526,6 +529,6 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
     assert re.search(spilled_exec_option_regex, result.runtime_profile)
 
     # Check that PLAN_ROOT_SINK's reservation limit match.
-    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+    plan_root_sink_reservation_limit = 
r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
         .format(expected_limit)
     assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index cac4f407a..be4dcf49c 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -20,6 +20,7 @@ from tests.common.environ import 
ImpalaTestClusterFlagsDetector
 from tests.common.file_utils import grep_dir
 from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster
 from tests.common.impala_cluster import ImpalaCluster
+from tests.common.impala_connection import FINISHED, RUNNING
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.util.filesystem_utils import supports_storage_ids
 from tests.util.parse_util import parse_duration_string_ms
@@ -146,7 +147,7 @@ class TestWebPage(ImpalaTestSuite):
     query = "select count(*) from functional_parquet.alltypes where bool_col = 
sleep(100)"
     query_handle = self.client.execute_async(query)
     try:
-      self.wait_for_state(query_handle, self.client.QUERY_STATES['RUNNING'], 
1000)
+      self.client.wait_for_impala_state(query_handle, RUNNING, 1000)
       memz_breakdown = self.get_debug_page(self.MEMZ_URL)['detailed']
       finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}")
       assert finstance_re.search(memz_breakdown), memz_breakdown
@@ -171,8 +172,8 @@ class TestWebPage(ImpalaTestSuite):
       assert "application/json" == response.headers['Content-Type']
       jmx_json = ""
       try:
-       jmx_json = json.loads(response.text)
-       assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % 
jmx_json
+        jmx_json = json.loads(response.text)
+        assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % 
jmx_json
       except ValueError:
         assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json
 
@@ -454,8 +455,8 @@ class TestWebPage(ImpalaTestSuite):
 
   def __test_table_metrics(self, db_name, tbl_name, metric):
     self.client.execute("refresh %s.%s" % (db_name, tbl_name))
-    self.get_and_check_status(self.TABLE_METRICS_URL +
-      "?name=%s.%s" % (db_name, tbl_name), metric, 
ports_to_test=self.CATALOG_TEST_PORT)
+    self.get_and_check_status(self.TABLE_METRICS_URL
+      + "?name=%s.%s" % (db_name, tbl_name), metric, 
ports_to_test=self.CATALOG_TEST_PORT)
 
   def __test_catalog_tables_loading_time(self, db_name, tbl_name):
     """Test the list of tables with the longest loading time in the catalog 
page.
@@ -491,7 +492,7 @@ class TestWebPage(ImpalaTestSuite):
       # Find the entry for the db table and verify its file count.
       if re.search(target_metric, trow) is not None:
         # Get the number following <td> in the entry
-        nfiles = re.search('(?<=\<td\>)\d+', trow)
+        nfiles = re.search(r'(?<=\<td\>)\d+', trow)
         assert nfiles.group(0) == numfiles
     response = self.get_and_check_status(self.CATALOG_URL + "?json",
       "high_file_count_tables", ports_to_test=self.CATALOG_TEST_PORT)
@@ -515,7 +516,7 @@ class TestWebPage(ImpalaTestSuite):
     expected_result = "select \"{0}...".format("x " * 121)
     check_if_contains = False
     response_json = self.__run_query_and_get_debug_page(
-      query, self.QUERIES_URL, 
expected_state=self.client.QUERY_STATES["FINISHED"])
+      query, self.QUERIES_URL, expected_state=FINISHED)
     # Search the json for the expected value.
     # The query can be in in_flight_queries even though it is in FINISHED 
state.
     for json_part in itertools.chain(
@@ -539,7 +540,7 @@ class TestWebPage(ImpalaTestSuite):
     response_json = ""
     try:
       if expected_state:
-        self.wait_for_state(query_handle, expected_state, 100)
+        self.client.wait_for_impala_state(query_handle, expected_state, 100)
       responses = self.get_and_check_status(
         page_url + "?query_id=%s&json" % query_handle.get_handle().id,
         ports_to_test=[25000])
@@ -556,7 +557,7 @@ class TestWebPage(ImpalaTestSuite):
     queries; nothing for DDL statements"""
     sleep_query = "select sleep(10000) from functional.alltypes limit 1"
     ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, 
sleep_query)
-    running_state = self.client.QUERY_STATES['RUNNING']
+    running_state = RUNNING
     backend_state_properties = ['cpu_user_s', 'rpc_latency', 
'num_remaining_instances',
                                 'num_instances', 
'peak_per_host_mem_consumption',
                                 'time_since_last_heard_from', 'status', 'host',
@@ -587,7 +588,7 @@ class TestWebPage(ImpalaTestSuite):
     nothing for DDL statements"""
     sleep_query = "select sleep(10000) from functional.alltypes limit 1"
     ctas_sleep_query = "create table {0}.foo as {1}".format(unique_database, 
sleep_query)
-    running_state = self.client.QUERY_STATES['RUNNING']
+    running_state = RUNNING
     instance_stats_properties = ['fragment_name', 'time_since_last_heard_from',
                                  'current_state', 
'first_status_update_received',
                                  'instance_id', 'done']
@@ -595,6 +596,7 @@ class TestWebPage(ImpalaTestSuite):
     for query in [sleep_query, ctas_sleep_query]:
       response_json = self.__run_query_and_get_debug_page(query,
                                                           
self.QUERY_FINSTANCES_URL,
+                                                          
query_options=query_options,
                                                           
expected_state=running_state)
 
       assert 'backend_instances' in response_json
@@ -613,7 +615,8 @@ class TestWebPage(ImpalaTestSuite):
         assert not instance_stats['done']
 
     response_json = self.__run_query_and_get_debug_page("describe 
functional.alltypes",
-                                                         
self.QUERY_BACKENDS_URL)
+                                                        
self.QUERY_BACKENDS_URL,
+                                                        
query_options=query_options)
     assert 'backend_instances' not in response_json
 
   @pytest.mark.xfail(run=False, reason="IMPALA-8059")
@@ -900,7 +903,7 @@ class TestWebPage(ImpalaTestSuite):
     # Matches all 'form' tags that are not followed by including the hidden 
inputs.
     form_regex = "<form [^{]*?>(?!{{>www/form-hidden-inputs.tmpl}})"
     # Matches XMLHttpRequest.open() in javascript that are not followed with 
make_url().
-    javascript_regex = "open\(['\"]GET['\"], (?!make_url)"
+    javascript_regex = r"open\(['\"]GET['\"], (?!make_url)"
     # Matches urls in json parameters passed to DataTables.
     datatables_regex = "url: ['\"](?!make_url)"
     # Matches all references of paths that contain '/www/' but are not fully 
qualified.
@@ -911,7 +914,8 @@ class TestWebPage(ImpalaTestSuite):
     regex = "(%s)|(%s)|(%s)|(%s)|(%s)|(%s)|(%s)" % \
         (href_regex, script_regex, form_regex, javascript_regex, 
datatables_regex,
          path_regex, link_regex)
-    results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex, 
".*\.tmpl")
+    results = grep_dir(os.path.join(os.environ['IMPALA_HOME'], "www"), regex,
+                       r".*\.tmpl")
     assert len(results) == 0, \
         "All links on the webui must include the webserver host: %s" % results
 
@@ -1037,7 +1041,7 @@ class TestWebPage(ImpalaTestSuite):
     """Tests that /queries page shows query progress."""
     query = "select count(*) from functional_parquet.alltypes where bool_col = 
sleep(100)"
     response_json = self.__run_query_and_get_debug_page(
-      query, self.QUERIES_URL, 
expected_state=self.client.QUERY_STATES["RUNNING"])
+      query, self.QUERIES_URL, expected_state=RUNNING)
     for json_part in response_json['in_flight_queries']:
       if query in json_part['stmt']:
         assert json_part["query_progress"] == "0 / 4 ( 0%)"
@@ -1175,6 +1179,7 @@ class TestWebPage(ImpalaTestSuite):
     # check if response size is 2 , for both catalog and impalad webUI
     assert len(responses) == 2
 
+
 class TestWebPageAndCloseSession(ImpalaTestSuite):
   ROOT_URL = "http://localhost:{0}/";
 

Reply via email to