This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f28a32fbc351919aec6d8b0d357f62c9b2521a40
Author: Riza Suminto <[email protected]>
AuthorDate: Sat Mar 29 06:23:25 2025 -0700

    IMPALA-13916: Change BaseTestSuite.default_test_protocol to HS2
    
    This is the final patch to move all Impala e2e and custom cluster tests
    to use HS2 protocol by default. Only beeswax-specific test remains
    testing against beeswax protocol by default. We can remove them once
    Impala officially remove beeswax support.
    
    HS2 error message formatting in impala-hs2-server.cc is adjusted a bit
    to match with formatting in impala-beeswax-server.cc.
    
    Move TestWebPageAndCloseSession from webserver/test_web_pages.py to
    custom_cluster/test_web_pages.py to disable glog log buffering.
    
    Testing:
    - Pass exhaustive tests, except for some known and unrelated flaky
      tests.
    
    Change-Id: I42e9ceccbba1e6853f37e68f106265d163ccae28
    Reviewed-on: http://gerrit.cloudera.org:8080/22845
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Jason Fehr <[email protected]>
---
 be/src/service/impala-hs2-server.cc                |  4 +-
 tests/common/base_test_suite.py                    |  5 +-
 tests/common/impala_service.py                     | 19 +++---
 tests/common/impala_test_suite.py                  |  3 +-
 tests/conftest.py                                  |  2 +-
 tests/custom_cluster/test_coordinators.py          |  2 +-
 tests/custom_cluster/test_data_cache.py            | 31 +++++-----
 tests/custom_cluster/test_executor_groups.py       |  6 +-
 .../test_frontend_connection_limit.py              |  2 +-
 tests/custom_cluster/test_hs2_fault_injection.py   |  2 +-
 tests/custom_cluster/test_query_retries.py         |  6 ++
 tests/custom_cluster/test_web_pages.py             | 40 +++++++++++--
 tests/failure/test_failpoints.py                   | 43 ++++++-------
 tests/metadata/test_ddl.py                         |  2 +-
 tests/metadata/test_event_processing.py            | 11 ++--
 tests/metadata/test_explain.py                     | 70 ++++++++++------------
 tests/metadata/test_recursive_listing.py           | 19 +++---
 tests/performance/query_exec_functions.py          | 29 +++++++--
 tests/query_test/test_beeswax.py                   |  9 +--
 tests/query_test/test_errorlog.py                  | 11 ++--
 tests/query_test/test_insert.py                    |  6 +-
 tests/query_test/test_mem_usage_scaling.py         |  6 +-
 tests/query_test/test_result_spooling.py           |  9 +--
 tests/query_test/test_runtime_filters.py           | 10 ++--
 tests/query_test/test_tablesample.py               |  8 ++-
 tests/webserver/test_web_pages.py                  | 63 +++++++------------
 26 files changed, 230 insertions(+), 188 deletions(-)

diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index ba35ca395..c18e83a86 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -1169,7 +1169,9 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const 
TGetLogReq& request) {
   }
 
   // Report analysis errors
-  ss << join(query_handle->GetAnalysisWarnings(), "\n");
+  for (const string& warning : query_handle->GetAnalysisWarnings()) {
+    ss << warning << "\n";
+  }
   // Report queuing reason if the admission controller queued the query.
   const string* admission_result = 
query_handle->summary_profile()->GetInfoString(
       AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
diff --git a/tests/common/base_test_suite.py b/tests/common/base_test_suite.py
index 84dc26241..5612dbb02 100644
--- a/tests/common/base_test_suite.py
+++ b/tests/common/base_test_suite.py
@@ -19,10 +19,11 @@
 from __future__ import absolute_import, division, print_function
 import logging
 
-from tests.common.test_vector import BEESWAX, ImpalaTestMatrix
+from tests.common.test_vector import HS2, ImpalaTestMatrix
 
 LOG = logging.getLogger('base_test_suite')
 
+
 # Base class for tests.
 class BaseTestSuite(object):
   ImpalaTestMatrix = ImpalaTestMatrix()
@@ -40,4 +41,4 @@ class BaseTestSuite(object):
   @classmethod
   def default_test_protocol(cls):
     """See documentation in ImpalaTestSuite.default_test_protocol()."""
-    return BEESWAX
+    return HS2
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index cff62f5b1..ae9707ff8 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -272,7 +272,8 @@ class ImpaladService(BaseImpalaService):
 
   def get_num_known_live_executors(self, timeout=30, interval=1,
       include_shutting_down=True):
-    return self.get_num_known_live_backends(timeout=timeout, interval=interval,
+    return self.get_num_known_live_backends(timeout=timeout,
+                                            interval=interval,
                                             
include_shutting_down=include_shutting_down,
                                             only_executors=True)
 
@@ -305,7 +306,8 @@ class ImpaladService(BaseImpalaService):
 
   def get_queries_json(self, timeout=30, interval=1):
     """Return the full JSON from the /queries page."""
-    return self.get_debug_webpage_json('queries', timeout=timeout, 
interval=interval)
+    return json.loads(
+        self.read_debug_webpage('queries?json', timeout=timeout, 
interval=interval))
 
   def get_query_locations(self):
     # Returns a dictionary of the format <host_address, 
num_of_queries_running_there>
@@ -396,9 +398,9 @@ class ImpaladService(BaseImpalaService):
     return self.read_debug_webpage(
         "query_profile?query_id=%s&raw" % (query_id), timeout=timeout, 
interval=interval)
 
-  def get_query_status(self, query_id):
+  def get_query_status(self, query_id, timeout=10, interval=1):
     """Gets the 'Query Status' section of the query's runtime profile."""
-    page = self.read_query_profile_page(query_id)
+    page = self.read_query_profile_page(query_id, timeout=timeout, 
interval=interval)
     status_line =\
         next((x for x in page.split('\n') if re.search('Query Status:', x)), 
None)
     return status_line.split('Query Status:')[1].strip()
@@ -430,10 +432,11 @@ class ImpaladService(BaseImpalaService):
     query_status = ""
     while (time() - start_time < timeout):
       try:
-        query_status = self.get_query_status(query_id)
+        query_status = self.get_query_status(
+            query_id, timeout=timeout, interval=interval)
         if query_status is None:
-          assert False, "Could not find 'Query Status' section in profile of "\
-              "query with id %s:\n%s" % (query_id)
+          raise Exception("Could not find 'Query Status' section in profile of 
"
+                          "query with id {}".format(query_id))
       except Exception:
         pass
       if expected_content in query_status:
@@ -455,6 +458,7 @@ class ImpaladService(BaseImpalaService):
   def create_beeswax_client(self, use_kerberos=False):
     """Creates a new beeswax client connection to the impalad.
     DEPRECATED: Use create_hs2_client() instead."""
+    LOG.warning('beeswax protocol is deprecated.')
     client = create_connection('%s:%d' % (self.hostname, self.beeswax_port),
                                use_kerberos, BEESWAX)
     client.connect()
@@ -514,6 +518,7 @@ class ImpaladService(BaseImpalaService):
     if protocol == HS2_HTTP:
       port = self.hs2_http_port
     if protocol == BEESWAX:
+      LOG.warning('beeswax protocol is deprecated.')
       port = self.beeswax_port
     client = create_connection('%s:%d' % (self.hostname, port), 
protocol=protocol)
     client.connect()
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 996822483..0ae933bbd 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -901,7 +901,8 @@ class ImpalaTestSuite(BaseTestSuite):
         assert result.success, "Query failed: {0}".format(result.data)
 
         # Decode the results read back if the data is stored with a specific 
encoding.
-        if encoding: result.data = [row.decode(encoding) for row in 
result.data]
+        if encoding and result.data:
+            result.data = [row.decode(encoding) for row in result.data]
         # Replace $NAMENODE in the expected results with the actual namenode 
URI.
         if 'RESULTS' in test_section:
           # Combining 'RESULTS' with 'DML_RESULTS" is currently unsupported 
because
diff --git a/tests/conftest.py b/tests/conftest.py
index edf6a4a2c..9c3154488 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -56,7 +56,7 @@ DEFAULT_KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', 
'127.0.0.1')
 DEFAULT_KUDU_MASTER_PORT = os.getenv('KUDU_MASTER_PORT', '7051')
 DEFAULT_METASTORE_SERVER = 'localhost:9083'
 DEFAULT_NAMENODE_ADDR = None
-DEFAULT_TEST_PROTOCOL = os.getenv('DEFAULT_TEST_PROTOCOL', BEESWAX)
+DEFAULT_TEST_PROTOCOL = os.getenv('DEFAULT_TEST_PROTOCOL', HS2)
 if FILESYSTEM == 'isilon':
   DEFAULT_NAMENODE_ADDR = 
"{node}:{port}".format(node=os.getenv("ISILON_NAMENODE"),
                                                  port=ISILON_WEBHDFS_PORT)
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index e95cc0c0e..84cc9cbd7 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -50,7 +50,7 @@ class TestCoordinators(CustomClusterTestSuite):
     # Verify that Beeswax and HS2 client connections can't be established at a 
worker node
     beeswax_client = None
     try:
-      beeswax_client = worker.service.create_hs2_client()
+      beeswax_client = worker.service.create_beeswax_client()
     except Exception as e:
       LOG.info("Caught exception {0}".format(e))
     finally:
diff --git a/tests/custom_cluster/test_data_cache.py 
b/tests/custom_cluster/test_data_cache.py
index 36bb03576..73a868ae8 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -23,6 +23,7 @@ import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
+from tests.common.test_vector import HS2
 
 
 @SkipIf.is_buggy_el6_kernel
@@ -216,7 +217,8 @@ class TestDataCache(CustomClusterTestSuite):
     QUERY = "select * from tpch_parquet.lineitem"
     # Execute a query, record the total bytes and the number of entries of 
cache before
     # cache dump.
-    self.execute_query(QUERY)
+    with self.create_impala_client(protocol=HS2) as client1:
+      client1.execute(QUERY)
     assert self.get_data_cache_metric('hit-bytes') == 0
     assert self.get_data_cache_metric('hit-count') == 0
     total_bytes = self.get_data_cache_metric('total-bytes')
@@ -242,8 +244,8 @@ class TestDataCache(CustomClusterTestSuite):
       assert self.get_data_cache_metric('num-entries') == num_entries
 
     # Reconnect to the service and execute the query, expecting some cache 
hits.
-    self.client.connect()
-    self.execute_query(QUERY)
+    with self.create_impala_client(protocol=HS2) as client2:
+      client2.execute(QUERY)
     assert self.get_data_cache_metric('hit-bytes') > 0
     assert self.get_data_cache_metric('hit-count') > 0
     if test_reduce_size:
@@ -285,21 +287,22 @@ class TestDataCache(CustomClusterTestSuite):
     QUERY = "select * from tpch_parquet.lineitem"
     # Execute the query asynchronously, wait a short while, and do gracefully 
shutdown
     # immediately to test the race between cache writes and setting cache 
read-only.
-    handle = self.execute_query_async(QUERY)
-    sleep(1)
-    impalad = self.cluster.impalads[0]
-    impalad.kill(SIGRTMIN)
-    self.client.fetch(QUERY, handle)
-    self.client.close_query(handle)
-    impalad.wait_for_exit()
-    impalad.start()
-    impalad.service.wait_for_num_known_live_backends(1)
+    with self.create_impala_client(protocol=HS2) as client1:
+      handle = client1.execute_async(QUERY)
+      sleep(1)
+      impalad = self.cluster.impalads[0]
+      impalad.kill(SIGRTMIN)
+      client1.fetch(QUERY, handle)
+      client1.close_query(handle)
+      impalad.wait_for_exit()
+      impalad.start()
+      impalad.service.wait_for_num_known_live_backends(1)
 
     # We hope that in this case, the cache is still properly dumped and loaded,
     # and then the same query is executed to expect some cache hits.
     self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.')
-    self.client.connect()
-    self.execute_query(QUERY)
+    with self.create_impala_client(protocol=HS2) as client2:
+      client2.execute(QUERY)
     assert self.get_data_cache_metric('hit-bytes') > 0
     assert self.get_data_cache_metric('hit-count') > 0
 
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index 48df14ad4..a13f56782 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -678,11 +678,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Predicates to assert that a certain join type was picked.
     def assert_broadcast_join():
       ret = self.execute_query_expect_success(self.client, QUERY)
-      assert ":EXCHANGE [BROADCAST]" in str(ret)
+      assert ":EXCHANGE [BROADCAST]" in str(ret.data)
 
     def assert_hash_join():
       ret = self.execute_query_expect_success(self.client, QUERY)
-      assert ":EXCHANGE [HASH(b.id)]" in str(ret)
+      assert ":EXCHANGE [HASH(b.id)]" in str(ret.data)
 
     # Without any executors we default to a hash join.
     assert_hash_join()
@@ -721,7 +721,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Predicate to assert that the planner decided on a hash join.
     def assert_hash_join():
       ret = self.execute_query_expect_success(self.client, QUERY)
-      assert ":EXCHANGE [HASH(b.id)]" in str(ret)
+      assert ":EXCHANGE [HASH(b.id)]" in str(ret.data)
 
     # Without any executors we default to a hash join.
     assert_hash_join()
diff --git a/tests/custom_cluster/test_frontend_connection_limit.py 
b/tests/custom_cluster/test_frontend_connection_limit.py
index 845bd80f1..5a5e0ca60 100644
--- a/tests/custom_cluster/test_frontend_connection_limit.py
+++ b/tests/custom_cluster/test_frontend_connection_limit.py
@@ -44,7 +44,7 @@ class TestFrontendConnectionLimit(CustomClusterTestSuite):
 
   def _connect_and_query(self, query, impalad):
     with impalad.service.create_hs2_client() as client:
-      client.execute(query)
+      self.execute_query_expect_success(client, query)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py 
b/tests/custom_cluster/test_hs2_fault_injection.py
index 1b4ca8632..0cfc124ce 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -410,7 +410,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
     self.custom_hs2_http_client.wait_to_finish(query_handle)
     self.transport.enable_fault(502, "Injected Fault", 0.50)
     warning_log = self.custom_hs2_http_client.get_warning_log(query_handle)
-    assert warning_log == 'WARNINGS: JOIN hint not recognized: foo'
+    assert 'WARNINGS: JOIN hint not recognized: foo' in warning_log
     self.close_query(query_handle)
     output = capsys.readouterr()[1].splitlines()
     assert output[1][TS_LEN:] == self.__expect_msg_retry("GetLog")
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index c94bc53b5..4cda46275 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -46,6 +46,7 @@ from tests.common.skip import (
     SkipIfNotHdfsMinicluster,
 )
 from tests.common.test_dimensions import add_mandatory_exec_option
+from tests.common.test_vector import BEESWAX
 
 # The BE krpc port of the impalad to simulate rpc or disk errors in tests.
 FAILED_KRPC_PORT = 27001
@@ -69,6 +70,11 @@ def _get_disk_fail_action(port):
 @SkipIfEC.parquet_file_size
 class TestQueryRetries(CustomClusterTestSuite):
 
+  @classmethod
+  def default_test_protocol(cls):
+    # Retry mechanism is slightly different between beeswax vs hs2 protocol.
+    return BEESWAX
+
   # A query that shuffles a lot of data. Useful when testing query retries 
since it
   # ensures that a query fails during a TransmitData RPC. The RPC failure will 
cause the
   # target impalad to be blacklisted and the query to be retried. The query 
also has to
diff --git a/tests/custom_cluster/test_web_pages.py 
b/tests/custom_cluster/test_web_pages.py
index 76c3cca6c..b6f77e115 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -28,7 +28,7 @@ from tests.common.custom_cluster_test_suite import (
   DEFAULT_CLUSTER_SIZE,
   CustomClusterTestSuite)
 from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
-from tests.common.skip import SkipIfFS
+from tests.common.skip import SkipIfFS, SkipIfDockerizedCluster
 from tests.shell.util import run_impala_shell_cmd
 
 SMALL_QUERY_LOG_SIZE_IN_BYTES = 40 * 1024
@@ -192,12 +192,9 @@ class TestWebPage(CustomClusterTestSuite):
     shell_messages = ["Query submitted at: ", "(Coordinator: ",
         "Query state can be monitored at: "]
     query_shell_arg = '--query=select * from functional.alltypes'
-    # hs2
+    # protocol is set inside vector
     results = run_impala_shell_cmd(vector, [query_shell_arg])
     self._validate_shell_messages(results.stderr, shell_messages, 
should_exist=False)
-    # beeswax
-    results = run_impala_shell_cmd(vector, ['--protocol=beeswax', 
query_shell_arg])
-    self._validate_shell_messages(results.stderr, shell_messages, 
should_exist=False)
     # Even though webserver url is not exposed, it is still accessible.
     page = requests.get('http://localhost:25000')
     assert page.status_code == requests.codes.ok
@@ -542,3 +539,36 @@ class TestWebPage(CustomClusterTestSuite):
     self._test_catalog_tables_stats_after_describe("functional.alltypes", 24)
     self._test_catalog_tables_stats_after_describe(
         "functional_parquet.iceberg_lineitem_sixblocks", 4)
+
+
+class TestWebPageAndCloseSession(CustomClusterTestSuite):
+  ROOT_URL = "http://localhost:{0}/";
+
+  @SkipIfDockerizedCluster.daemon_logs_not_exposed
+  @CustomClusterTestSuite.with_args(disable_log_buffering=True)
+  def test_display_src_socket_in_query_cause(self):
+    # Execute a long running query then cancel it from the WebUI.
+    # Check the runtime profile and the INFO logs for the cause message.
+    query = "select sleep(10000)"
+    handle = self.execute_query_async(query)
+    query_id = self.client.handle_id(handle)
+    cancel_query_url = 
"{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
+      ("25000"), query_id)
+    text_profile_url = 
"{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL
+      .format("25000"), query_id)
+    requests.get(cancel_query_url)
+    response = requests.get(text_profile_url)
+    cancel_status = "Cancelled from Impala&apos;s debug web interface by user: 
" \
+                    "&apos;anonymous&apos; at"
+    assert cancel_status in response.text
+    self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug 
web "
+      "interface by user: 'anonymous' at", expected_count=-1, timeout_s=30)
+    # Session closing from the WebUI does not produce the cause message in the 
profile,
+    # so we will skip checking the runtime profile.
+    results = self.execute_query("select current_session()")
+    session_id = results.data[0]
+    close_session_url = 
"{0}close_session?session_id={1}".format(self.ROOT_URL.format
+      ("25000"), session_id)
+    requests.get(close_session_url)
+    self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s 
debug "
+      "web interface by user: 'anonymous' at", expected_count=-1, timeout_s=30)
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 8ef2c29f9..ba0df0b74 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -43,20 +43,22 @@ FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
 MT_DOP_VALUES = [0, 4]
 
 # Queries should cover all exec nodes.
+# {db} will be replaced by target database for specific table_format in test 
vector.
 QUERIES = [
-  "select * from alltypes",
-  "select count(*) from alltypessmall",
-  "select count(int_col) from alltypessmall group by id",
-  "select 1 from alltypessmall a join alltypessmall b on a.id = b.id",
-  "select 1 from alltypessmall a join alltypessmall b on a.id != b.id",
-  "select 1 from alltypessmall order by id",
-  "select 1 from alltypessmall order by id limit 100",
-  "select * from alltypessmall union all select * from alltypessmall",
-  "select row_number() over (partition by int_col order by id) from 
alltypessmall",
-  "select c from (select id c from alltypessmall order by id limit 10) v where 
c = 1",
+  "select * from {db}.alltypes",
+  "select count(*) from {db}.alltypessmall",
+  "select count(int_col) from {db}.alltypessmall group by id",
+  "select 1 from {db}.alltypessmall a join {db}.alltypessmall b on a.id = 
b.id",
+  "select 1 from {db}.alltypessmall a join {db}.alltypessmall b on a.id != 
b.id",
+  "select 1 from {db}.alltypessmall order by id",
+  "select 1 from {db}.alltypessmall order by id limit 100",
+  "select * from {db}.alltypessmall union all select * from 
{db}.alltypessmall",
+  "select row_number() over (partition by int_col order by id) from 
{db}.alltypessmall",
+  """select c from (select id c from {db}.alltypessmall order by id limit 10) v
+     where c = 1""",
   """SELECT STRAIGHT_JOIN *
-           FROM alltypes t1
-                  JOIN /*+broadcast*/ alltypesagg t2 ON t1.id = t2.id
+           FROM {db}.alltypes t1
+                  JOIN /*+broadcast*/ {db}.alltypesagg t2 ON t1.id = t2.id
            WHERE t2.int_col < 1000"""
 ]
 
@@ -93,11 +95,8 @@ class TestFailpoints(ImpalaTestSuite):
   # killer on machines with 30GB RAM. This makes the test run in 4 minutes 
instead of 1-2.
   @pytest.mark.execute_serially
   def test_failpoints(self, vector):
-    with self.change_database(self.client, vector.get_table_format()):
-      self.__run_failpoints(vector)
-
-  def __run_failpoints(self, vector):
-    query = vector.get_value('query')
+    db_name = 
ImpalaTestSuite.get_db_name_from_format(vector.get_table_format())
+    query = vector.get_value('query').format(db=db_name)
     action = vector.get_value('action')
     location = vector.get_value('location')
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
@@ -201,7 +200,9 @@ class TestFailpoints(ImpalaTestSuite):
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
     LOG.info('Sleeping')
     sleep(3)
-    cancel_result = self.client.cancel(handle)
-    self.client.close_query(handle)
-    assert cancel_result.status_code == 0,\
-        'Unexpected status code from cancel request: %s' % cancel_result
+    try:
+      self.client.cancel(handle)
+    except IMPALA_CONNECTION_EXCEPTION as e:
+      assert False, 'Unexpected exception from cancel request: 
{}'.format(str(e))
+    finally:
+      self.client.close_query(handle)
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index e54f0fc43..867fe6528 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -808,7 +808,7 @@ class TestDdlStatements(TestDdlBase):
     # paths, converts them to integers, and checks that wehave all the ones we
     # expect.
     PARTITION_RE = re.compile("p=([0-9]+)")
-    assert list(map(int, PARTITION_RE.findall(str(partitions)))) == \
+    assert list(map(int, PARTITION_RE.findall(str(partitions.data)))) == \
         list(range(MAX_PARTITION_UPDATES_PER_RPC + 2))
 
   def test_create_alter_tbl_properties(self, unique_database):
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index dde3a06b9..cc297477a 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -641,23 +641,24 @@ class TestEventSyncWaiting(ImpalaTestSuite):
     self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
     res = self.execute_query_expect_success(client, insert_stmt)
     # Result rows are "partition_name: num_rows_inserted" for each modified 
partitions
-    assert res.data == ['p=0: 1']
+    assert 'Partition: p=0\nNumModifiedRows: 1\n' in res.runtime_profile
     # Insert one row into the same partition in Hive and use the table in 
INSERT in Impala
     self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
     res = self.execute_query_expect_success(client, insert_stmt)
-    assert res.data == ['p=0: 2']
+    assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile
     # Add another new partition in Hive and use the table in INSERT in Impala
     self.run_stmt_in_hive("insert into {} select 2,2".format(tbl))
     res = self.execute_query_expect_success(client, insert_stmt)
-    assert res.data == ['p=0: 2', 'p=2: 1']
+    assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile
+    assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile
     # Drop one partition in Hive and use the table in INSERT in Impala
     self.run_stmt_in_hive("alter table {} drop partition(p=0)".format(tbl))
     res = self.execute_query_expect_success(client, insert_stmt)
-    assert res.data == ['p=2: 1']
+    assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile
     # Truncate the table in Hive and use it in INSERT in Impala
     self.run_stmt_in_hive("truncate table {}".format(tbl))
     res = self.execute_query_expect_success(client, insert_stmt)
-    assert len(res.data) == 0
+    assert 'NumModifiedRows:' not in res.runtime_profile
 
   def test_txn(self, vector, unique_database):
     client = self.create_impala_client_from_vector(vector)
diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py
index 799e022d1..4e43b7a98 100644
--- a/tests/metadata/test_explain.py
+++ b/tests/metadata/test_explain.py
@@ -22,8 +22,13 @@ import re
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal, SkipIfNotHdfsMinicluster, SkipIfEC
+from tests.common.test_dimensions import (
+    create_exec_option_dimension,
+    create_uncompressed_text_dimension,
+)
 from tests.util.filesystem_utils import WAREHOUSE
 
+
 # Tests the different explain levels [0-3] on a few queries.
 # TODO: Clean up this test to use an explain level test dimension and 
appropriate
 # result sub-sections for the expected explain plans.
@@ -35,12 +40,10 @@ class TestExplain(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestExplain, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'text' and\
-        v.get_value('table_format').compression_codec == 'none' and\
-        v.get_value('exec_option')['batch_size'] == 0 and\
-        v.get_value('exec_option')['disable_codegen'] == False and\
-        v.get_value('exec_option')['num_nodes'] != 1)
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        cluster_sizes=[0], batch_sizes=[0], disable_codegen_options=[False]))
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
 
   @SkipIfNotHdfsMinicluster.plans
   def test_explain_level0(self, vector):
@@ -72,7 +75,7 @@ class TestExplain(ImpalaTestSuite):
   @staticmethod
   def check_row_size_and_cardinality(query_result, expected_row_size=None,
                                      expected_cardinality=None):
-    regex = re.compile('tuple-ids=.+ row-size=(\d+)B cardinality=(.*)')
+    regex = re.compile(r'tuple-ids=.+ row-size=(\d+)B cardinality=(.*)')
     found_match = False
     for res in query_result:
       m = regex.match(res.strip())
@@ -85,7 +88,7 @@ class TestExplain(ImpalaTestSuite):
           assert m.groups()[1] == expected_cardinality
     assert found_match, query_result
 
-  def test_explain_validate_cardinality_estimates(self, vector, 
unique_database):
+  def test_explain_validate_cardinality_estimates(self, unique_database):
     # Tests that the cardinality estimates are correct for partitioned tables.
     # TODO Cardinality estimation tests should eventually be part of the 
planner tests.
     # TODO Remove this test
@@ -98,17 +101,17 @@ class TestExplain(ImpalaTestSuite):
 
     # All partitions are filtered out, cardinality should be 0.
     result = self.execute_query("explain select * from %s.%s where year = 
1900" % (
-        db_name, tbl_name), query_options={'explain_level':3})
+        db_name, tbl_name), query_options={'explain_level': 3})
     check_cardinality(result.data, '0')
 
     # Half of the partitions are filtered out, cardinality should be 3650.
     result = self.execute_query("explain select * from %s.%s where year = 
2010" % (
-        db_name, tbl_name), query_options={'explain_level':3})
+        db_name, tbl_name), query_options={'explain_level': 3})
     check_cardinality(result.data, '3.65K')
 
     # None of the partitions are filtered out, cardinality should be 7300.
     result = self.execute_query("explain select * from %s.%s" % (db_name, 
tbl_name),
-        query_options={'explain_level':3})
+        query_options={'explain_level': 3})
     check_cardinality(result.data, '7.30K')
 
     # Create a partitioned table with a mixed set of available stats,
@@ -122,11 +125,11 @@ class TestExplain(ImpalaTestSuite):
       "alter table %s set tblproperties('numRows'='100')" % mixed_tbl)
     # Should fall back to table-level cardinality when partitions lack stats.
     result = self.execute_query("explain select * from %s" % mixed_tbl,
-        query_options={'explain_level':3})
+        query_options={'explain_level': 3})
     check_cardinality(result.data, '100')
     # Should fall back to table-level cardinality, even for a subset of 
partitions,
     result = self.execute_query("explain select * from %s where p = 1" % 
mixed_tbl,
-        query_options={'explain_level':3})
+        query_options={'explain_level': 3})
     check_cardinality(result.data, '100')
     # Set the number of rows at the table level to -1.
     self.execute_query(
@@ -137,17 +140,17 @@ class TestExplain(ImpalaTestSuite):
     # Use partition stats when availabe. Row counts for partitions without
     # stats are estimated.
     result = self.execute_query("explain select * from %s" % mixed_tbl,
-        query_options={'explain_level':3})
+        query_options={'explain_level': 3})
     check_cardinality(result.data, '51')
     # Set the number of rows at the table level back to 100.
     self.execute_query(
       "alter table %s set tblproperties('numRows'='100')" % mixed_tbl)
     # Fall back to table-level stats when no selected partitions have stats.
     result = self.execute_query("explain select * from %s where p = 2" % 
mixed_tbl,
-        query_options={'explain_level':3})
+        query_options={'explain_level': 3})
     check_cardinality(result.data, '100')
 
-  def test_explain_row_size_estimates(self, vector, unique_database):
+  def test_explain_row_size_estimates(self, unique_database):
     """ Tests that EXPLAIN returns the expected row sizes with and without 
stats.
 
     Planner tests is probably a more logical place for this, but covering 
string avg_size
@@ -187,47 +190,40 @@ class TestExplain(ImpalaTestSuite):
 
 
 class TestExplainEmptyPartition(ImpalaTestSuite):
-  TEST_DB_NAME = "imp_1708"
-
-  def setup_method(self, method):
-    self.cleanup_db(self.TEST_DB_NAME)
-    self.execute_query("create database if not exists {0} location 
'{1}/{0}.db'"
-        .format(self.TEST_DB_NAME, WAREHOUSE))
-
-  def teardown_method(self, method):
-    self.cleanup_db(self.TEST_DB_NAME)
 
   @SkipIfLocal.hdfs_client
-  def test_non_empty_partition_0_rows(self):
+  def test_non_empty_partition_0_rows(self, unique_database):
     """Regression test for IMPALA-1708: if a partition has 0 rows but > 0 
files after
     COMPUTE STATS, don't warn the user about missing stats. The files are 
probably
     corrupted, or used for something else."""
     self.client.execute("SET EXPLAIN_LEVEL=3")
     self.client.execute("CREATE TABLE %s.empty_partition (col int) "
-                        "partitioned by (p int)" % self.TEST_DB_NAME)
+                        "partitioned by (p int)" % unique_database)
     self.client.execute(
-      "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % 
self.TEST_DB_NAME)
+      "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % 
unique_database)
     # Put an empty file in the partition so we have > 0 files, but 0 rows
     self.filesystem_client.create_file(
         "{1}/{0}.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty".
-        format(self.TEST_DB_NAME, WAREHOUSE), "")
-    self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME)
-    self.client.execute("COMPUTE STATS %s.empty_partition" % self.TEST_DB_NAME)
+        format(unique_database, WAREHOUSE), "")
+    self.client.execute("REFRESH %s.empty_partition" % unique_database)
+    self.client.execute("COMPUTE STATS %s.empty_partition" % unique_database)
     assert "NULL\t0\t1" in str(
-        self.client.execute("SHOW PARTITIONS %s.empty_partition" % 
self.TEST_DB_NAME))
+        self.client.execute(
+            "SHOW PARTITIONS %s.empty_partition" % unique_database).get_data())
     assert "missing relevant table and/or column statistics" not in str(
         self.client.execute(
-            "EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME))
+            "EXPLAIN SELECT * FROM %s.empty_partition" % 
unique_database).get_data())
 
     # Now add a partition with some data (so it gets selected into the scan), 
to check
     # that its lack of stats is correctly identified
     self.client.execute(
-      "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % self.TEST_DB_NAME)
+      "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % unique_database)
     self.filesystem_client.create_file(
-        "{1}/{0}.db/empty_partition/p=1/rows".format(self.TEST_DB_NAME, 
WAREHOUSE), "1")
-    self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME)
+        "{1}/{0}.db/empty_partition/p=1/rows".format(unique_database, 
WAREHOUSE), "1")
+    self.client.execute("REFRESH %s.empty_partition" % unique_database)
     explain_result = str(
-      self.client.execute("EXPLAIN SELECT * FROM %s.empty_partition" % 
self.TEST_DB_NAME))
+      self.client.execute(
+          "EXPLAIN SELECT * FROM %s.empty_partition" % 
unique_database).get_data())
     assert "missing relevant table and/or column statistics" in explain_result
     # Also test IMPALA-1530 - adding the number of partitions missing stats
     assert "partitions: 1/2 " in explain_result
diff --git a/tests/metadata/test_recursive_listing.py 
b/tests/metadata/test_recursive_listing.py
index 698013364..8b00cc273 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -16,7 +16,7 @@ import pytest
 import requests
 import time
 
-from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
+from tests.common.impala_connection import ERROR, FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.common.skip import SkipIfLocal, SkipIfFS
@@ -41,8 +41,8 @@ class TestRecursiveListing(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        (v.get_value('table_format').file_format == 'text' and
-         v.get_value('table_format').compression_codec == 'none'))
+        (v.get_value('table_format').file_format == 'text'
+         and v.get_value('table_format').compression_codec == 'none'))
 
   def _show_files(self, table):
     files = self.client.execute("show files in {0}".format(table))
@@ -65,7 +65,7 @@ class TestRecursiveListing(ImpalaTestSuite):
 
     # Create the table
     self.execute_query_expect_success(self.client,
-        ("create table {tbl} (a string) {partclause} " +
+        ("create table {tbl} (a string) {partclause} "
          "stored as textfile location '{loc}'").format(
             tbl=fq_tbl_name,
             partclause=(partitioned and "partitioned by (p int)" or ""),
@@ -119,7 +119,7 @@ class TestRecursiveListing(ImpalaTestSuite):
     assert len(self._get_rows(fq_tbl_name)) == 3
 
     # Test that disabling recursive listings makes the nested files disappear.
-    self.execute_query_expect_success(self.client, ("alter table {0} set 
tblproperties(" +
+    self.execute_query_expect_success(self.client, ("alter table {0} set 
tblproperties("
         "'impala.disable.recursive.listing'='true')").format(fq_tbl_name))
     self.execute_query_expect_success(self.client, "refresh 
{0}".format(fq_tbl_name))
     assert len(self._show_files(fq_tbl_name)) == 1
@@ -130,7 +130,7 @@ class TestRecursiveListing(ImpalaTestSuite):
     assert len(self._show_files(fq_tbl_name)) == 1
     assert len(self._get_rows(fq_tbl_name)) == 1
     # Re-enable.
-    self.execute_query_expect_success(self.client, ("alter table {0} set 
tblproperties(" +
+    self.execute_query_expect_success(self.client, ("alter table {0} set 
tblproperties("
         "'impala.disable.recursive.listing'='false')").format(fq_tbl_name))
     self.execute_query_expect_success(self.client, "refresh 
{0}".format(fq_tbl_name))
     assert len(self._show_files(fq_tbl_name)) == 3
@@ -210,10 +210,7 @@ class TestRecursiveListing(ImpalaTestSuite):
       LOG.info("removing staging dir " + large_dir)
       self.filesystem_client.delete_file_dir(large_dir, recursive=True)
       LOG.info("removed staging dir " + large_dir)
-      try:
-        self.client.fetch(refresh_stmt, handle)
-        assert not refresh_should_fail, "REFRESH should fail"
-      except IMPALA_CONNECTION_EXCEPTION as e:
-        assert refresh_should_fail, "unexpected exception " + str(e)
+      expected_state = ERROR if refresh_should_fail else FINISHED
+      self.client.wait_for_impala_state(handle, expected_state, 10)
     finally:
       requests.get(self.reset_log_level_url)
diff --git a/tests/performance/query_exec_functions.py 
b/tests/performance/query_exec_functions.py
index 0da147146..ed7fc1b9a 100644
--- a/tests/performance/query_exec_functions.py
+++ b/tests/performance/query_exec_functions.py
@@ -21,7 +21,7 @@ import logging
 import re
 from datetime import datetime
 from impala.dbapi import connect
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient, 
ImpalaBeeswaxResult
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
 from sys import maxsize
 from tests.performance.query import HiveQueryResult, ImpalaQueryResult
 from tests.util.shell_util import exec_process
@@ -34,6 +34,7 @@ DEFAULT_HIVE_HS2_PORT = 10000
 
 LOG = logging.getLogger('query_exec_functions')
 
+
 def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False,
                         database=None, execOptions=None):
   host, port = hiveserver, DEFAULT_HIVE_HS2_PORT
@@ -52,6 +53,7 @@ def get_hs2_hive_cursor(hiveserver, user=None, 
use_kerberos=False,
     LOG.error("Error Connecting: {0}".format(str(e)))
   return cursor
 
+
 def execute_using_hive_hs2(query, query_config):
   exec_result = HiveQueryResult(query, query_config=query_config)
   plugin_runner = query_config.plugin_runner
@@ -80,6 +82,7 @@ def execute_using_hive_hs2(query, query_config):
     if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
   return exec_result
 
+
 def get_hs2_impala_cursor(impalad, use_kerberos=False, database=None):
   """Get a cursor to an impalad
 
@@ -107,6 +110,7 @@ def get_hs2_impala_cursor(impalad, use_kerberos=False, 
database=None):
     LOG.error("Error connecting: {0}".format(str(e)))
   return cursor
 
+
 def execute_using_impala_hs2(query, query_config):
   """Executes a sql query against Impala using the hs2 interface.
 
@@ -140,6 +144,7 @@ def execute_using_impala_hs2(query, query_config):
     if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
     return exec_result
 
+
 def establish_beeswax_connection(query_config):
   """Establish a connection to the user specified impalad.
 
@@ -148,7 +153,10 @@ def establish_beeswax_connection(query_config):
 
   Returns:
     ImpalaBeeswaxClient is the connection suceeds, None otherwise.
+
+  DEPRECATED: use hs2 instead of beeswax.
   """
+  LOG.warning('beeswax protocol is deprecated.')
   use_kerberos = query_config.use_kerberos
   user = query_config.user
   password = query_config.password
@@ -170,6 +178,7 @@ def establish_beeswax_connection(query_config):
     LOG.error("Error connecting: {0}".format(str(e)))
   return client
 
+
 def execute_using_impala_beeswax(query, query_config):
   """Executes a query using beeswax.
 
@@ -181,6 +190,8 @@ def execute_using_impala_beeswax(query, query_config):
 
   Returns:
     ImpalaQueryResult
+
+  DEPRECATED: use hs2 instead of beeswax.
   """
 
   # Create a client object to talk to impalad
@@ -204,6 +215,7 @@ def execute_using_impala_beeswax(query, query_config):
     if plugin_runner: plugin_runner.run_plugins_post(context=context, 
scope="Query")
     return construct_exec_result(result, exec_result)
 
+
 def build_context(query, query_config):
   """Build context based on query config for plugin_runner.
 
@@ -221,6 +233,7 @@ def build_context(query, query_config):
   context['query'] = query
   return context
 
+
 def construct_exec_result(result, exec_result):
   """ Transform an ImpalaBeeswaxResult object to a ImpalaQueryResult object.
 
@@ -241,22 +254,24 @@ def construct_exec_result(result, exec_result):
     setattr(exec_result, attr, getattr(result, attr))
   return exec_result
 
+
 def execute_using_jdbc(query, query_config):
   """Executes a query using JDBC"""
   query_string = query.query_str + ';'
   if query.db:
     query_string = 'use %s; %s' % (query.db, query_string)
   cmd = query_config.jdbc_client_cmd + " -q \"%s\"" % query_string
-  return run_query_capture_results(cmd, query, exit_on_error=False)
+  return run_query_capture_results(cmd, query)
 
-def parse_jdbc_query_results(stdout, stderr, query):
+
+def parse_jdbc_query_results(stdout, query):
   """
   Parse query execution results for the Impala JDBC client
 
   Parses the query execution details (avg time, stddev) from the output of the 
Impala
   JDBC test client.
   """
-  jdbc_result_regex = 'row\(s\) in (\d*).(\d*)s'
+  jdbc_result_regex = r'row\(s\) in (\d*).(\d*)s'
   time_taken = 0.0
   for line in stdout.split('\n'):
     match = re.search(jdbc_result_regex, line)
@@ -266,6 +281,7 @@ def parse_jdbc_query_results(stdout, stderr, query):
   result_data = re.findall(r'\[START\]----\n(.*?)\n----\[END\]', stdout, 
re.DOTALL)[0]
   return create_exec_result(time_taken, result_data, query)
 
+
 def create_exec_result(time_taken, result_data, query):
   exec_result = HiveQueryResult(query)
   if result_data:
@@ -275,7 +291,8 @@ def create_exec_result(time_taken, result_data, query):
   exec_result.success = True
   return exec_result
 
-def run_query_capture_results(cmd, query, exit_on_error):
+
+def run_query_capture_results(cmd, query):
   """
   Runs the given query command and returns the execution result.
 
@@ -300,7 +317,7 @@ def run_query_capture_results(cmd, query, exit_on_error):
     exec_result.query_error = msg
     return exec_result
   # The command completed
-  exec_result = parse_jdbc_query_results(stdout, stderr, query)
+  exec_result = parse_jdbc_query_results(stdout, query)
   exec_result.query = query
   exec_result.start_time = start_time
   return exec_result
diff --git a/tests/query_test/test_beeswax.py b/tests/query_test/test_beeswax.py
index a0c240ec1..8fcac557c 100644
--- a/tests/query_test/test_beeswax.py
+++ b/tests/query_test/test_beeswax.py
@@ -18,6 +18,7 @@
 from __future__ import absolute_import, division, print_function
 from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import BEESWAX
 
 
 class TestBeeswax(ImpalaTestSuite):
@@ -27,13 +28,13 @@ class TestBeeswax(ImpalaTestSuite):
     and different users."""
     USER1 = "user1"
     USER2 = "user2"
-    client1 = self.client
+    client1 = self.create_impala_client(protocol=BEESWAX)
     different_user_client = None
     unset_user_client = None
     try:
-      same_user_client = self.create_impala_client(protocol='beeswax')
-      different_user_client = self.create_impala_client(protocol='beeswax')
-      unset_user_client = self.create_impala_client(protocol='beeswax')
+      same_user_client = self.create_impala_client(protocol=BEESWAX)
+      different_user_client = self.create_impala_client(protocol=BEESWAX)
+      unset_user_client = self.create_impala_client(protocol=BEESWAX)
 
       # Unauthenticated Beewax only sets user once the query is run.
       result = client1.execute("select effective_user()", user=USER1)
diff --git a/tests/query_test/test_errorlog.py 
b/tests/query_test/test_errorlog.py
index 7d2efe230..faa4e66c3 100644
--- a/tests/query_test/test_errorlog.py
+++ b/tests/query_test/test_errorlog.py
@@ -59,10 +59,13 @@ class TestErrorLogs(ImpalaTestSuite):
       # large enough to further guarantee at least one cleared error maps to 
be sent to
       # coordinator.
       sleep(30)
-      cancel_result = self.client.cancel(handle)
-      self.client.close_query(handle)
-      assert cancel_result.status_code == 0,\
-          'Unexpected status code from cancel request: %s' % cancel_result
+      try:
+        self.client.cancel(handle)
+      except IMPALA_CONNECTION_EXCEPTION as ex:
+        assert False, 'Unexpected exception from cancel request: {}'.format(
+            str(ex))
+      finally:
+        self.client.close_query(handle)
     # As long as impala did not crash we are good.
     except IMPALA_CONNECTION_EXCEPTION:
       return
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index aab02f329..0a639c2cf 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -307,7 +307,8 @@ class TestInsertPartKey(ImpalaTestSuite):
                 "%2F%3A%3D%3F%5C%7B%5B%5D%23%5E"
     res = self.execute_query(
         "insert into {} partition(p='{}') values (0)".format(tbl, 
special_characters))
-    assert res.data[0] == part_dir + ": 1"
+    assert part_dir in res.runtime_profile
+    assert 'NumModifiedRows: 1\n' in res.runtime_profile
     res = self.client.execute("select p from {}".format(tbl))
     assert res.data[0] == part_value
     res = self.execute_query("show partitions " + tbl)
@@ -341,7 +342,8 @@ class TestInsertPartKey(ImpalaTestSuite):
                      "\\u001F\\\"\\u007F\'%*\\/:=?\\\\{[]#^"
     res = self.execute_query(
         "insert into {} values (0, '{}')".format(tbl, special_characters))
-    assert res.data[0] == part_dir + ": 1"
+    assert part_dir in res.runtime_profile
+    assert 'NumModifiedRows: 1\n' in res.runtime_profile
     res = self.client.execute("select p from {}".format(tbl))
     assert res.data[0] == part_value
     res = self.execute_query("show partitions " + tbl)
diff --git a/tests/query_test/test_mem_usage_scaling.py 
b/tests/query_test/test_mem_usage_scaling.py
index 18c0d21db..60ae1fe4a 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -123,10 +123,12 @@ class TestLowMemoryLimits(ImpalaTestSuite):
     try:
       self.run_test_case(tpch_query, new_vector)
     except IMPALA_CONNECTION_EXCEPTION as e:
-      if not expects_error: raise
+      if not expects_error:
+        assert False, "Not expecting error, but got {}".format(str(e))
       found_expected_error = False
       for error_msg in MEM_LIMIT_ERROR_MSGS:
-        if error_msg in str(e): found_expected_error = True
+        if error_msg in str(e):
+          found_expected_error = True
       assert found_expected_error, str(e)
 
 
diff --git a/tests/query_test/test_result_spooling.py 
b/tests/query_test/test_result_spooling.py
index 52ffdc06e..924425137 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -19,7 +19,6 @@ from __future__ import absolute_import, division, 
print_function
 import pytest
 import re
 import time
-import threading
 
 from time import sleep
 from tests.common.errors import Timeout
@@ -177,15 +176,13 @@ class TestResultSpooling(ImpalaTestSuite):
     # Regex to look for in the runtime profile.
     get_wait_time_regex = "RowBatchGetWaitTime: [1-9]"
 
-    # Execute the query, start a thread to fetch results, wait for the query 
to finish,
+    # Execute the query asynchronously, fetch results, wait for the query to 
finish,
     # and then validate that RowBatchGetWaitTime shows a non-zero value in the 
profile.
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
     try:
-      thread = threading.Thread(target=lambda:
-          self.create_impala_client().fetch(query, handle))
-      thread.start()
+      self.client.wait_for_admission_control(handle, 60)
+      self.client.fetch(query, handle)
       self.client.wait_for_impala_state(handle, FINISHED, 10)
-      thread.join()
       assert re.search(get_wait_time_regex, 
self.client.get_runtime_profile(handle))
     finally:
       self.client.close_query(handle)
diff --git a/tests/query_test/test_runtime_filters.py 
b/tests/query_test/test_runtime_filters.py
index ba4169ff6..2e2a7f1cd 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -112,16 +112,14 @@ class TestRuntimeFilters(ImpalaTestSuite):
     get woken up and exit promptly when the query is cancelled."""
     # Make sure the cluster is quiesced before we start this test
     self._verify_no_fragments_running()
-    with self.change_database(self.client, vector.get_value('table_format')):
-      self.__run_wait_time_cancellation(vector)
-
-  def __run_wait_time_cancellation(self, vector):
     # Set up a query where a scan (plan node 0, scanning alltypes) will wait
     # indefinitely for a filter to arrive. The filter arrival is delayed
     # by adding a wait to the scan of alltypestime (plan node 0).
+    db_name = 
ImpalaTestSuite.get_db_name_from_format(vector.get_table_format())
     QUERY = """select straight_join *
-               from alltypes t1
-                    join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id"""
+               from {db}.alltypes t1
+                    join /*+shuffle*/ {db}.alltypestiny t2
+                    on t1.id = t2.id""".format(db=db_name)
     self.client.set_configuration(vector.get_exec_option_dict())
     self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT")
     self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", 
"10000000")
diff --git a/tests/query_test/test_tablesample.py 
b/tests/query_test/test_tablesample.py
index 97a9adb01..e15e59088 100644
--- a/tests/query_test/test_tablesample.py
+++ b/tests/query_test/test_tablesample.py
@@ -51,19 +51,21 @@ class TestTableSample(ImpalaTestSuite):
   def __run_tablesample(self, vector):
     repeatable = vector.get_value('repeatable')
     filtered = vector.get_value('filtered')
+    db_name = 
ImpalaTestSuite.get_db_name_from_format(vector.get_table_format())
 
     where_clause = ""
     if filtered:
       where_clause = "where month between 1 and 6"
 
-    result = self.client.execute("select count(*) from alltypes %s" % 
where_clause)
+    result = self.client.execute("select count(*) from {}.alltypes {}".format(
+        db_name, where_clause))
     baseline_count = int(result.data[0])
     prev_count = None
     for perc in [5, 20, 50, 100]:
       rep_sql = ""
       if repeatable: rep_sql = " repeatable(1)"
-      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s %s" \
-                 % (perc, rep_sql, where_clause)
+      sql_stmt = "select count(*) from {}.alltypes tablesample system({}){} 
{}".format(
+          db_name, perc, rep_sql, where_clause)
       handle = self.client.execute_async(sql_stmt)
       # IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 
sec before
       # failing and logging the backtraces of all impalads.
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 880341c67..8ae846235 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -18,10 +18,10 @@
 from __future__ import absolute_import, division, print_function
 from tests.common.environ import ImpalaTestClusterFlagsDetector
 from tests.common.file_utils import grep_dir
-from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster
+from tests.common.skip import SkipIfBuildType
 from tests.common.impala_cluster import ImpalaCluster
-from tests.common.impala_connection import FINISHED, RUNNING
-from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.impala_connection import FINISHED, RUNNING, 
MinimalHS2Connection
+from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, 
ImpalaTestSuite
 from tests.util.filesystem_utils import supports_storage_ids
 from tests.util.parse_util import parse_duration_string_ms
 from tests.common.test_vector import HS2
@@ -1082,16 +1082,20 @@ class TestWebPage(ImpalaTestSuite):
   def test_query_cancel_created(self):
     """Tests that if we cancel a query in the CREATED state, it still finishes 
and we can
     cancel it."""
+    # Use MinimalHS2Connection because it has simpler concurrency than 
hs2_client.
+    with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client:
+      delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
+      client.set_configuration(dict(debug_action=delay_created_action))
+      self._run_test_query_cancel_created(client)
+
+  def _run_test_query_cancel_created(self, client):
     query = "select count(*) from functional_parquet.alltypes"
-    delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
 
     response_json = self.try_until("test baseline", self.get_queries,
         lambda resp: resp['num_in_flight_queries'] == 0)
-
     # Start the query completely async. The server doesn't return a response 
until
     # the query has exited the CREATED state, so we need to get the query ID 
another way.
-    self.client.set_configuration(dict(debug_action=delay_created_action))
-    proc = Process(target=lambda cli, q: cli.execute_async(q), 
args=(self.client, query))
+    proc = Process(target=lambda cli, q: cli.execute_async(q), args=(client, 
query))
     proc.start()
 
     response_json = self.try_until("query creation", self.get_queries,
@@ -1133,9 +1137,15 @@ class TestWebPage(ImpalaTestSuite):
   def test_query_cancel_exception(self):
     """Tests that if we cancel a query in the CREATED state and it has an 
exception, we
     can cancel it."""
+    # Use MinimalHS2Connection because it has simpler concurrency than 
hs2_client.
+    with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client:
+      delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
+      client.set_configuration(dict(debug_action=delay_created_action))
+      self._test_query_cancel_exception(client)
+
+  def _test_query_cancel_exception(self, client):
     # Trigger UDF ERROR: Cannot divide decimal by zero
     query = "select *, 1.0/0 from functional_parquet.alltypes limit 10"
-    delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
 
     response_json = self.try_until("test baseline", self.get_queries,
         lambda resp: resp['num_in_flight_queries'] == 0)
@@ -1145,9 +1155,8 @@ class TestWebPage(ImpalaTestSuite):
 
     # Start the query completely async. The server doesn't return a response 
until
     # the query has exited the CREATED state, so we need to get the query ID 
another way.
-    self.client.set_configuration(dict(debug_action=delay_created_action))
     queue = Queue()
-    proc = Process(target=run, args=(queue, self.client, query))
+    proc = Process(target=run, args=(queue, client, query))
     proc.start()
 
     response_json = self.try_until("query creation", self.get_queries,
@@ -1168,7 +1177,7 @@ class TestWebPage(ImpalaTestSuite):
     proc.join()
     assert query_handle
     try:
-      self.client.fetch(query, query_handle)
+      client.fetch(query, query_handle)
     except Exception as e:
       re.match("UDF ERROR: Cannot divide decimal by zero", str(e))
 
@@ -1194,35 +1203,3 @@ class TestWebPage(ImpalaTestSuite):
         "fs.defaultFS", ports_to_test=self.TEST_PORTS_WITHOUT_SS)
     # check if response size is 2 , for both catalog and impalad webUI
     assert len(responses) == 2
-
-
-class TestWebPageAndCloseSession(ImpalaTestSuite):
-  ROOT_URL = "http://localhost:{0}/";
-
-  @SkipIfDockerizedCluster.daemon_logs_not_exposed
-  def test_display_src_socket_in_query_cause(self):
-    # Execute a long running query then cancel it from the WebUI.
-    # Check the runtime profile and the INFO logs for the cause message.
-    query = "select sleep(10000)"
-    handle = self.execute_query_async(query)
-    query_id = self.client.handle_id(handle)
-    cancel_query_url = 
"{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
-      ("25000"), query_id)
-    text_profile_url = 
"{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL
-      .format("25000"), query_id)
-    requests.get(cancel_query_url)
-    response = requests.get(text_profile_url)
-    cancel_status = "Cancelled from Impala&apos;s debug web interface by user: 
" \
-                    "&apos;anonymous&apos; at"
-    assert cancel_status in response.text
-    self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug 
web "
-      "interface by user: 'anonymous' at", expected_count=-1)
-    # Session closing from the WebUI does not produce the cause message in the 
profile,
-    # so we will skip checking the runtime profile.
-    results = self.execute_query("select current_session()")
-    session_id = results.data[0]
-    close_session_url = 
"{0}close_session?session_id={1}".format(self.ROOT_URL.format
-      ("25000"), session_id)
-    requests.get(close_session_url)
-    self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s 
debug "
-      "web interface by user: 'anonymous' at", expected_count=-1)

Reply via email to