This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f8a1f6046 IMPALA-14091: Migrate test_query_retries.py to HS2
f8a1f6046 is described below

commit f8a1f6046adb7026437eb00d71d195d1527f091b
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 22 11:36:42 2025 -0700

    IMPALA-14091: Migrate test_query_retries.py to HS2
    
    test_query_retries.py still pinned to test using beeswax protocol by
    default. This patch refactor to test using hs2 protocol.
    
    Testing:
    - Run and pass test_query_retries.py in exhaustive mode.
    
    Change-Id: If12eeb47b843f0d1faca47994b2001e6d4c8ac58
    Reviewed-on: http://gerrit.cloudera.org:8080/22939
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/impala_connection.py          |  5 ++++
 tests/custom_cluster/test_query_retries.py | 40 +++++++++++++++++-------------
 2 files changed, 28 insertions(+), 17 deletions(-)

diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 536990bb2..f57c1f3a3 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -758,6 +758,11 @@ class ImpylaHS2Connection(ImpalaConnection):
       id = session_handle_to_session_id(cursor.session.handle)
     return "" if id is None else id
 
+  def session_id(self, operation_handle):
+    cursor = operation_handle.get_handle()
+    session_id = self.__get_session_id(cursor)
+    return session_id if session_id else str(cursor.session)
+
   def handle_id(self, operation_handle):
     query_id = self.get_query_id(operation_handle)
     return query_id if query_id else str(operation_handle)
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 4cda46275..b372ea574 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -46,7 +46,6 @@ from tests.common.skip import (
     SkipIfNotHdfsMinicluster,
 )
 from tests.common.test_dimensions import add_mandatory_exec_option
-from tests.common.test_vector import BEESWAX
 
 # The BE krpc port of the impalad to simulate rpc or disk errors in tests.
 FAILED_KRPC_PORT = 27001
@@ -70,11 +69,6 @@ def _get_disk_fail_action(port):
 @SkipIfEC.parquet_file_size
 class TestQueryRetries(CustomClusterTestSuite):
 
-  @classmethod
-  def default_test_protocol(cls):
-    # Retry mechanism is slightly different between beeswax vs hs2 protocol.
-    return BEESWAX
-
   # A query that shuffles a lot of data. Useful when testing query retries 
since it
   # ensures that a query fails during a TransmitData RPC. The RPC failure will 
cause the
   # target impalad to be blacklisted and the query to be retried. The query 
also has to
@@ -156,6 +150,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Launch a query, it should be retried.
     handle = self.execute_query_async(self._shuffle_heavy_query,
         query_options={'retry_failed_queries': 'true'})
+    query_id = self.client.handle_id(handle)
     self.client.wait_for_impala_state(handle, RUNNING, 60)
 
     # Kill a random impalad.
@@ -203,7 +198,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Assert that the second most recently completed query is the original 
query and it is
     # marked as 'RETRIED'.
     assert completed_queries[1]['state'] == 'RETRIED'
-    assert completed_queries[1]['query_id'] == self.client.handle_id(handle)
+    assert completed_queries[1]['query_id'] == query_id
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -396,6 +391,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
+    query_id = self.client.handle_id(handle)
     # Wait until the query fails.
     self.client.wait_for_impala_state(handle, ERROR, 140)
 
@@ -424,6 +420,8 @@ class TestQueryRetries(CustomClusterTestSuite):
              "queries scheduled only on the coordinator (either NUM_NODES set 
to 1 " \
              "or when small query optimization is triggered) can currently 
run" in str(e)
       assert "Additional Details: Not Applicable" in str(e)
+    finally:
+      self.client.close_query(handle)
 
     # Assert that the RPC un-reachable impalad not shows up in the list of 
blacklisted
     # executors from the runtime profile.
@@ -432,8 +430,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Assert that the query id of the original query is in the runtime profile 
of the
     # retried query.
-    self.__validate_original_id_in_profile(retried_runtime_profile,
-        self.client.handle_id(handle))
+    self.__validate_original_id_in_profile(retried_runtime_profile, query_id)
 
     # Validate the state of the web ui. The query must be closed before 
validating the
     # state since it asserts that no queries are in flight.
@@ -484,6 +481,8 @@ class TestQueryRetries(CustomClusterTestSuite):
       assert False
     except IMPALA_CONNECTION_EXCEPTION as e:
       assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)
+    finally:
+      self.client.close_query(handle)
 
     # Assert that the killed impalad shows up in the list of blacklisted 
executors from
     # the runtime profile.
@@ -788,21 +787,28 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
     query = self._count_query
-    handle = self.execute_query_async(query,
-        query_options={'retry_failed_queries': 'true',
-                       'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
-    query_id = self.client.handle_id(handle)
+    self.hs2_client.set_configuration({
+        'retry_failed_queries': 'true',
+        'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
+    handle = self.hs2_client.execute_async(query)
+    query_id = self.hs2_client.handle_id(handle)
+    session_id = self.hs2_client.session_id(handle)
     self.__wait_until_retry_state(query_id, 'RETRIED')
 
     # SetQueryInflight will complete before execute_query_async returns 
because it will
     # be completed before Impala acknowledges that the query has started.
     page = 
self.cluster.get_first_impalad().service.get_debug_webpage_json('sessions')
+    session_found = False
     for session in page['sessions']:
-      # Every session should have one completed query: either test setup, or 
the original
-      # query that's being retried.
-      assert session['inflight_queries'] < session['total_queries']
+      if session['session_id'] == session_id:
+        # Session 'session_id' should have one completed query: either test 
setup, or
+        # the original query that's being retried.
+        assert session['inflight_queries'] < session['total_queries']
+        session_found = True
+    self.hs2_client.close_query(handle)
+    self.hs2_client.clear_configuration()
+    assert session_found, "session_id {} not found at {}".format(session_id, 
page)
 
-    self.client.close_query(handle)
     # If original query state closure is skipped, the coordinator will crash 
on a DCHECK.
     time.sleep(2)
     assert self.cluster.impalads[0].get_pid() is not None, "Coordinator 
crashed"

Reply via email to