This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8324201acd0f9d4e37ef8c910c26e0a28e12ffef
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Mar 7 13:57:42 2025 -0800

    IMPALA-13847: Remove beeswax-specific way to obtain query id
    
    With IMPALA-13682 merged, checking for query state can be done via
    ImpalaConnection.handle_id() that works for beeswax, hs2, and hs2-http
    protocol. This patch apply such change.
    ImpalaTestSuite.wait_for_progress() is refactored a bit to make client
    parameter required.
    
    Testing:
    - Run and pass the affected tests.
    
    Change-Id: I0a2bac1011f5a0e058f88f973ac403cce12d2b86
    Reviewed-on: http://gerrit.cloudera.org:8080/22606
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/impala_service.py                |  4 +-
 tests/common/impala_test_suite.py             |  8 +--
 tests/custom_cluster/test_process_failures.py |  2 +-
 tests/custom_cluster/test_query_log.py        |  4 +-
 tests/custom_cluster/test_query_retries.py    | 71 +++++++++++++++------------
 tests/custom_cluster/test_result_spooling.py  |  5 +-
 tests/query_test/test_observability.py        |  2 +-
 tests/stress/query_retries_stress_runner.py   | 10 ++--
 tests/util/cancel_util.py                     |  8 +--
 tests/webserver/test_web_pages.py             |  5 +-
 10 files changed, 66 insertions(+), 53 deletions(-)

diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 103c5c1d4..65f0ea0f7 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -397,13 +397,13 @@ class ImpaladService(BaseImpalaService):
         query_state = client.get_state(query_handle)
       except Exception as e:
         LOG.error("Exception while getting state of query {0}\n{1}".format(
-            query_handle.get_handle().id, str(e)))
+            client.handle_id(query_handle), str(e)))
       if query_state == target_state:
         return
       sleep(interval)
     assert target_state == query_state, \
         'Query {0} did not reach query state in time target={1} 
actual={2}'.format(
-            query_handle.get_handle().id, target_state, query_state)
+            client.handle_id(query_handle), target_state, query_state)
     return
 
   def wait_for_query_status(self, client, query_id, expected_content,
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 1f34176a3..b8648b836 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1462,9 +1462,8 @@ class ImpalaTestSuite(BaseTestSuite):
       raise Timeout(timeout_msg)
     return actual_state
 
-  def wait_for_progress(self, handle, expected_progress, timeout, client=None):
+  def wait_for_progress(self, client, handle, expected_progress, timeout):
     """Waits for the given query handle to reach expected progress rate"""
-    if client is None: client = self.client
     start_time = time.time()
     summary = client.get_exec_summary(handle)
     while time.time() - start_time < timeout and \
@@ -1474,7 +1473,7 @@ class ImpalaTestSuite(BaseTestSuite):
     actual_progress = self.__get_query_progress_rate(summary.progress)
     if actual_progress <= expected_progress:
       timeout_msg = "query '{0}' did not reach the expected progress {1}, 
current " \
-          "progress {2}".format(self.__get_id_or_query_from_handle(handle),
+          "progress {2}".format(client.handle_id(handle),
           expected_progress, actual_progress)
       raise Timeout(timeout_msg)
     return actual_progress
@@ -1487,7 +1486,8 @@ class ImpalaTestSuite(BaseTestSuite):
   def __get_id_or_query_from_handle(self, handle):
     """Returns a query identifier, for QueryHandlers it returns the query id. 
However,
     Impyla handle is a HiveServer2Cursor that does not have query id, returns 
the query
-    string instead."""
+    string instead.
+    DEPRECATED: Use client.handle_id() instead."""
     if isinstance(handle.get_handle(), HiveServer2Cursor):
       return handle.get_handle().query_string
     elif hasattr(handle.get_handle(), 'id'):
diff --git a/tests/custom_cluster/test_process_failures.py 
b/tests/custom_cluster/test_process_failures.py
index 4221425c3..01668d139 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -151,7 +151,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     # node, or because a stream sender failed to establish a thrift 
connection. It is
     # non-deterministic which of those paths will initiate cancellation, but 
in either
     # case the query status should include the failed (or unreachable) worker.
-    query_id = handle.get_handle().id
+    query_id = client.handle_id(handle)
     error_state = "Failed due to unreachable impalad"
     assert impalad.service.wait_for_query_status(client, query_id, error_state)
     assert error_msg_startswith(client.get_log(handle), error_state, query_id)
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index f7fcf67de..7e4eb2ac2 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -107,7 +107,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     # Run the query async to avoid fetching results since fetching such a 
large result was
     # causing the execution to take a very long time.
     handle = client.execute_async("select '{0}'".format(rand_long_str))
-    query_id = handle.get_handle().id
+    query_id = client.handle_id(handle)
     client.wait_for_finished_timeout(handle, 10)
     client.close_query(handle)
 
@@ -142,7 +142,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
 
     client.set_configuration_option("MAX_STATEMENT_LENGTH_BYTES", 16780000)
     handle = client.execute_async("select '{0}'".format(rand_long_str))
-    query_id = handle.get_handle().id
+    query_id = client.handle_id(handle)
     client.wait_for_finished_timeout(handle, 10)
     client.close_query(handle)
 
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index eb47bf86a..4cfc278e5 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -121,7 +121,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Validate the state of the runtime profiles.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
     self.__validate_runtime_profiles(
-        retried_runtime_profile, handle.get_handle().id, retried_query_id)
+        retried_runtime_profile, self.client.handle_id(handle), 
retried_query_id)
 
     # Validate the state of the client log.
     self.__validate_client_log(handle, retried_query_id)
@@ -170,7 +170,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate the state of the runtime profiles.
     self.__validate_runtime_profiles(
-        retried_runtime_profile, handle.get_handle().id, retried_query_id)
+        retried_runtime_profile, self.client.handle_id(handle), 
retried_query_id)
 
     # Validate the state of the client log.
     self.__validate_client_log(handle, retried_query_id)
@@ -193,7 +193,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Assert that the second most recently completed query is the original 
query and it is
     # marked as 'RETRIED'.
     assert completed_queries[1]['state'] == 'RETRIED'
-    assert completed_queries[1]['query_id'] == handle.get_handle().id
+    assert completed_queries[1]['query_id'] == self.client.handle_id(handle)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -234,7 +234,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       assert retried_query_id is not None
 
       self.__validate_runtime_profiles(
-          retried_runtime_profile, handle.get_handle().id, retried_query_id)
+          retried_runtime_profile, self.client.handle_id(handle), 
retried_query_id)
 
       self.__validate_client_log(handle, retried_query_id)
 
@@ -261,9 +261,10 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.client.wait_for_impala_state(handle, FINISHED, 60)
+    query_id = self.client.handle_id(handle)
 
     # Validate that the query was retried.
-    self.__validate_runtime_profiles_from_service(impalad_service, handle)
+    self.__validate_runtime_profiles_from_service(impalad_service, query_id)
 
     # Assert that the query succeeded and returned the correct results.
     results = self.client.fetch(query, handle)
@@ -284,7 +285,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate the state of the runtime profiles.
     self.__validate_runtime_profiles(
-        retried_runtime_profile, handle.get_handle().id, retried_query_id)
+        retried_runtime_profile, query_id, retried_query_id)
 
     # Validate the state of the client log.
     self.__validate_client_log(handle, retried_query_id)
@@ -322,9 +323,10 @@ class TestQueryRetries(CustomClusterTestSuite):
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
     self.client.wait_for_impala_state(handle, FINISHED, 80)
+    query_id = self.client.handle_id(handle)
 
     # Validate that the query was retried.
-    self.__validate_runtime_profiles_from_service(impalad_service, handle)
+    self.__validate_runtime_profiles_from_service(impalad_service, query_id)
 
     # Assert that the query succeeded and returned the correct results.
     results = self.client.fetch(query, handle)
@@ -351,7 +353,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate the state of the runtime profiles.
     self.__validate_runtime_profiles(
-        retried_runtime_profile, handle.get_handle().id, retried_query_id)
+        retried_runtime_profile, query_id, retried_query_id)
 
     # Validate the state of the client log.
     self.__validate_client_log(handle, retried_query_id)
@@ -421,7 +423,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Assert that the query id of the original query is in the runtime profile 
of the
     # retried query.
     self.__validate_original_id_in_profile(retried_runtime_profile,
-        handle.get_handle().id)
+        self.client.handle_id(handle))
 
     # Validate the state of the web ui. The query must be closed before 
validating the
     # state since it asserts that no queries are in flight.
@@ -441,13 +443,14 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(self._shuffle_heavy_query,
         query_options={'retry_failed_queries': 'true'})
     self.client.wait_for_impala_state(handle, RUNNING, 60)
+    query_id = self.client.handle_id(handle)
 
     # Kill one impalad so that a retry is triggered.
     killed_impalad = self.cluster.impalads[1]
     killed_impalad.kill()
 
     # Wait until the retry is running.
-    self.__wait_until_retry_state(handle, 'RETRIED')
+    self.__wait_until_retry_state(query_id, 'RETRIED')
 
     # Kill another impalad so that another retry is attempted.
     self.cluster.impalads[2].kill()
@@ -478,8 +481,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Assert that the query id of the original query is in the runtime profile 
of the
     # retried query.
-    self.__validate_original_id_in_profile(retried_runtime_profile,
-            handle.get_handle().id)
+    self.__validate_original_id_in_profile(retried_runtime_profile, query_id)
 
     # Validate the state of the web ui. The query must be closed before 
validating the
     # state since it asserts that no queries are in flight.
@@ -512,7 +514,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     except Exception as e:
       assert "Failed due to unreachable impalad" in str(e)
       assert "Skipping retry of query_id=%s because the client has already " \
-             "fetched some rows" % handle.get_handle().id in str(e)
+             "fetched some rows" % self.client.handle_id(handle) in str(e)
 
   @pytest.mark.execute_serially
   def test_spooling_all_results_for_retries(self):
@@ -541,7 +543,8 @@ class TestQueryRetries(CustomClusterTestSuite):
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
     assert retried_query_id is None
     runtime_profile = self.client.get_runtime_profile(handle)
-    assert self.__get_query_id_from_profile(runtime_profile) == 
handle.get_handle().id
+    query_id = self.client.handle_id(handle)
+    assert self.__get_query_id_from_profile(runtime_profile) == query_id
 
     self.client.close_query(handle)
 
@@ -554,7 +557,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       'retry_failed_queries': 'true', 'spool_query_results': 'true',
       'spool_all_results_for_retries': 'true'})
     # Wait until the first union operand finishes, so some results are spooled.
-    self.wait_for_progress(handle, 0.1, 60)
+    self.wait_for_progress(self.client, handle, 0.1, 60)
 
     self.__kill_random_impalad()
 
@@ -578,7 +581,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       'retry_failed_queries': 'true', 'spool_query_results': 'true',
       'spool_all_results_for_retries': 'true'})
     # Wait until the first union operand finishes and then kill one impalad.
-    self.wait_for_progress(handle, 0.1, 60)
+    self.wait_for_progress(self.client, handle, 0.1, 60)
 
     # Kill one impalad so the query will be retried.
     self.__kill_random_impalad()
@@ -643,7 +646,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     except ImpalaBeeswaxException as e:
       assert "Failed due to unreachable impalad" in str(e)
       assert "Skipping retry of query_id=%s because the client has already " \
-             "fetched some rows" % handle.get_handle().id in str(e)
+             "fetched some rows" % self.client.handle_id(handle) in str(e)
 
   @pytest.mark.execute_serially
   def test_original_query_cancel(self):
@@ -708,6 +711,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.client.wait_for_impala_state(handle, FINISHED, 60)
+    query_id = self.client.handle_id(handle)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -715,7 +719,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate that the query was retried.
     profile_retried_query_id = \
-        self.__validate_runtime_profiles_from_service(impalad_service, handle)
+        self.__validate_runtime_profiles_from_service(impalad_service, 
query_id)
     assert profile_retried_query_id == retried_query_id
     self.__validate_client_log(handle, retried_query_id)
 
@@ -746,13 +750,14 @@ class TestQueryRetries(CustomClusterTestSuite):
     query = self._count_query
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
-    self.__wait_until_retry_state(handle, 'RETRYING')
+    query_id = self.client.handle_id(handle)
+    self.__wait_until_retry_state(query_id, 'RETRYING')
 
     # Cancel the query.
     self.client.cancel(handle)
 
     # Check the original query retry status.
-    profile = self.__get_original_query_profile(handle.get_handle().id)
+    profile = self.__get_original_query_profile(query_id)
     retry_status = re.search("Retry Status: (.*)", profile)
     assert retry_status.group(1) == 'RETRYING'
 
@@ -776,7 +781,8 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
-    self.__wait_until_retry_state(handle, 'RETRIED')
+    query_id = self.client.handle_id(handle)
+    self.__wait_until_retry_state(query_id, 'RETRIED')
 
     # SetQueryInflight will complete before execute_query_async returns 
because it will
     # be completed before Impala acknowledges that the query has started.
@@ -889,6 +895,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true', 'exec_time_limit_s': 
'1'})
     self.client.wait_for_impala_state(handle, ERROR, 60)
+    query_id = self.client.handle_id(handle)
 
     # Validate the live exec summary.
     retried_query_id = self.__get_retried_query_id_from_summary(handle)
@@ -899,7 +906,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate that the query was retried.
     profile_retried_query_id = \
-        self.__validate_runtime_profiles_from_service(impalad_service, handle)
+        self.__validate_runtime_profiles_from_service(impalad_service, 
query_id)
     assert profile_retried_query_id == retried_query_id
     self.__validate_client_log(handle, retried_query_id)
 
@@ -929,13 +936,14 @@ class TestQueryRetries(CustomClusterTestSuite):
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
     client.wait_for_impala_state(handle, FINISHED, 60)
+    query_id = client.handle_id(handle)
 
     # Wait for the idle session timeout to expire the session.
     time.sleep(5)
 
     # Validate that the query was retried. Skip validating client log since we 
can't
     # get it using the expired session.
-    self.__validate_runtime_profiles_from_service(impalad_service, handle)
+    self.__validate_runtime_profiles_from_service(impalad_service, query_id)
 
     # Assert than attempt to fetch from the query handle fails with a session 
expired
     # error.
@@ -974,20 +982,20 @@ class TestQueryRetries(CustomClusterTestSuite):
     retried_runtime_profile = self.hs2_client.get_runtime_profile(handle,
         TRuntimeProfileFormat.STRING)
     self.__validate_runtime_profiles(
-        retried_runtime_profile, self.hs2_client.get_query_id(handle), 
retried_query_id)
+        retried_runtime_profile, self.hs2_client.handle_id(handle), 
retried_query_id)
     self.__validate_client_log(handle, retried_query_id, use_hs2_client=True)
     self.impalad_test_service.wait_for_metric_value(
         'impala-server.resultset-cache.total-num-rows', 1, timeout=60)
     self.hs2_client.close_query(handle)
 
-  def __validate_runtime_profiles_from_service(self, impalad_service, handle):
+  def __validate_runtime_profiles_from_service(self, impalad_service, 
query_id):
     """Wrapper around '__validate_runtime_profiles' that first fetches the 
retried profile
     from the web ui."""
-    original_profile = 
impalad_service.read_query_profile_page(handle.get_handle().id)
+    original_profile = impalad_service.read_query_profile_page(query_id)
     retried_query_id = 
self.__get_retried_query_id_from_profile(original_profile)
     retried_profile = impalad_service.read_query_profile_page(retried_query_id)
     self.__validate_runtime_profiles(
-        retried_profile, handle.get_handle().id, retried_query_id)
+        retried_profile, query_id, retried_query_id)
     return retried_query_id
 
   def __get_retried_query_id_from_profile(self, profile):
@@ -997,12 +1005,12 @@ class TestQueryRetries(CustomClusterTestSuite):
     if not retried_query_id_search: return None
     return retried_query_id_search.group(1)
 
-  def __wait_until_retry_state(self, handle, retry_state, timeout=300):
+  def __wait_until_retry_state(self, query_id, retry_state, timeout=300):
     """Wait until the given query handle has been retried. This is achieved by 
polling the
     runtime profile of the query and checking the 'Retry Status' field."""
 
     def __get_retry_status():
-      profile = self.__get_original_query_profile(handle.get_handle().id)
+      profile = self.__get_original_query_profile(query_id)
       retry_status = re.search("Retry Status: (.*)", profile)
       return retry_status.group(1) if retry_status else None
 
@@ -1015,7 +1023,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       retry_status = __get_retry_status()
     if retry_status != retry_state:
       raise Timeout("query {0} was not retried within timeout".format
-          (handle.get_handle().id))
+          (query_id))
 
   def __kill_random_impalad(self):
     """Kills a random impalad, except for the first node in the cluster, which 
should be
@@ -1272,6 +1280,7 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
     # Expect the impalad with disk failure is blacklisted, and query-retry is 
triggered
     # and is completed successfully.
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
+    query_id = client.handle_id(handle)
     results = client.fetch(self.spill_query, handle)
     assert results.success
 
@@ -1307,4 +1316,4 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
     # is marked as 'RETRIED'.
     assert completed_queries[1]['state'] == 'RETRIED'
     assert completed_queries[1]["rows_fetched"] == 0
-    assert completed_queries[1]['query_id'] == handle.get_handle().id
+    assert completed_queries[1]['query_id'] == query_id
diff --git a/tests/custom_cluster/test_result_spooling.py 
b/tests/custom_cluster/test_result_spooling.py
index 4c5a945e4..7da5f0bd0 100644
--- a/tests/custom_cluster/test_result_spooling.py
+++ b/tests/custom_cluster/test_result_spooling.py
@@ -61,6 +61,7 @@ class TestDedicatedCoordinator(CustomClusterTestSuite):
     timeout = 10
 
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    query_id = self.client.handle_id(handle)
     try:
       # Wait for the query to finish (all rows are spooled). Assert that the 
executor
       # has been shutdown and its memory has been released.
@@ -73,7 +74,7 @@ class TestDedicatedCoordinator(CustomClusterTestSuite):
       assert mem_admitted['executor'][0] == 0
       assert mem_admitted['coordinator'] > 0
       assert get_num_completed_backends(self.cluster.impalads[0].service,
-               handle.get_handle().id) == 1
+                                        query_id) == 1
 
       # Fetch all results from the query and assert that the coordinator and 
the executor
       # have been shutdown and their memory has been released.
@@ -83,6 +84,6 @@ class TestDedicatedCoordinator(CustomClusterTestSuite):
       assert mem_admitted['executor'][0] == 0
       assert mem_admitted['coordinator'] == 0
       assert get_num_completed_backends(self.cluster.impalads[0].service,
-               handle.get_handle().id) == 2
+                                        query_id) == 2
     finally:
       self.client.close_query(handle)
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index 251032200..7a4ea2561 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -1001,7 +1001,7 @@ class TestObservability(ImpalaTestSuite):
     # This query runs 15s
     query = "select count(*) from functional.alltypes where bool_col = 
sleep(50)"
     handle = self.execute_query_async(query)
-    query_id = handle.get_handle().id
+    query_id = self.client.handle_id(handle)
 
     cluster = ImpalaCluster.get_e2e_test_cluster()
     impalad = cluster.get_first_impalad()
diff --git a/tests/stress/query_retries_stress_runner.py 
b/tests/stress/query_retries_stress_runner.py
index cd221a8d1..840bf2e92 100755
--- a/tests/stress/query_retries_stress_runner.py
+++ b/tests/stress/query_retries_stress_runner.py
@@ -138,10 +138,12 @@ def run_concurrent_workloads(concurrency, coordinator, 
database, queries):
       # Run each query sequentially.
       for query in shuffled_queries:
         handle = None
+        query_id = None
         try:
           # Don't use client.execute as it eagerly fetches results, which 
causes retries
           # to be disabled.
           handle = client.execute_async(query)
+          query_id = client.handle_id(handle)
           if not client.wait_for_finished_timeout(handle, 3600):
             raise Exception("Timeout while waiting for query to finish")
           completed_queries_latch.on_query_completion(stream_id)
@@ -149,7 +151,7 @@ def run_concurrent_workloads(concurrency, coordinator, 
database, queries):
           # Check if the query was retried, and update any relevant counters.
           runtime_profile = client.get_runtime_profile(handle)
           if "Original Query Id" in runtime_profile:
-            LOG.info("Query {0} was retried".format(handle.get_handle().id))
+            LOG.info("Query {0} was retried".format(query_id))
             num_queries_retried += 1
             total_queries_retried_lock.acquire()
             total_queries_retried += 1
@@ -163,9 +165,9 @@ def run_concurrent_workloads(concurrency, coordinator, 
database, queries):
 
       LOG.info("Finished workload, retried {0} 
queries".format(num_queries_retried))
     except Exception:
-      if handle and handle.get_handle() and handle.get_handle().id:
-        LOG.exception("Query query_id={0} 
failed".format(handle.get_handle().id))
-        exception_queue.put((handle.get_handle().id, sys.exc_info()))
+      if query_id:
+        LOG.exception("Query query_id={0} failed".format(query_id))
+        exception_queue.put((query_id, sys.exc_info()))
       else:
         LOG.exception("An unknown query failed")
         exception_queue.put(("unknown", sys.exc_info()))
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index 741dd184e..4f98299ca 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -45,7 +45,7 @@ class QueryToKill:
     self.handle = self.client.execute_async(self.sql, user=self.user)
     self.poll_thread = threading.Thread(target=lambda: self.poll())
     self.poll_thread.start()
-    return self.client.get_query_id(self.handle)
+    return self.client.handle_id(self.handle)
 
   def __exit__(self, exc_type, exc_value, traceback):  # noqa: U100
     self.poll_thread.join()
@@ -58,11 +58,11 @@ class QueryToKill:
     assert error_msg_startswith(
         str(self.exc),
         "Invalid or unknown query handle",
-        self.client.get_query_id(self.handle),
+        self.client.handle_id(self.handle),
     ) or error_msg_startswith(
         str(self.exc),
         "Cancelled",
-        self.client.get_query_id(self.handle),
+        self.client.handle_id(self.handle),
     )
     try:
       self.client.fetch(self.sql, self.handle)
@@ -140,7 +140,7 @@ def __run_cancel_query_and_validate_state(client, query, 
exec_option,
       kill_client.connect()
       if exec_option:
         kill_client.set_configuration(exec_option)
-      assert_kill_ok(kill_client, client.get_query_id(handle))
+      assert_kill_ok(kill_client, client.handle_id(handle))
   else:
     cancel_result = client.cancel(handle)
     assert cancel_result.status_code == 0, \
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index be4dcf49c..d9fb7c185 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -542,7 +542,7 @@ class TestWebPage(ImpalaTestSuite):
       if expected_state:
         self.client.wait_for_impala_state(query_handle, expected_state, 100)
       responses = self.get_and_check_status(
-        page_url + "?query_id=%s&json" % query_handle.get_handle().id,
+        page_url + "?query_id=%s&json" % self.client.handle_id(query_handle),
         ports_to_test=[25000])
       assert len(responses) == 1
       response_json = json.loads(responses[0].text)
@@ -1188,7 +1188,8 @@ class TestWebPageAndCloseSession(ImpalaTestSuite):
     # Execute a long running query then cancel it from the WebUI.
     # Check the runtime profile and the INFO logs for the cause message.
     query = "select sleep(10000)"
-    query_id = self.execute_query_async(query).get_handle().id
+    handle = self.execute_query_async(query)
+    query_id = self.client.handle_id(handle)
     cancel_query_url = 
"{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
       ("25000"), query_id)
     text_profile_url = 
"{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL

Reply via email to