This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f8a1f6046 IMPALA-14091: Migrate test_query_retries.py to HS2
f8a1f6046 is described below
commit f8a1f6046adb7026437eb00d71d195d1527f091b
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 22 11:36:42 2025 -0700
IMPALA-14091: Migrate test_query_retries.py to HS2
test_query_retries.py still pinned to test using beeswax protocol by
default. This patch refactor to test using hs2 protocol.
Testing:
- Run and pass test_query_retries.py in exhaustive mode.
Change-Id: If12eeb47b843f0d1faca47994b2001e6d4c8ac58
Reviewed-on: http://gerrit.cloudera.org:8080/22939
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
tests/common/impala_connection.py | 5 ++++
tests/custom_cluster/test_query_retries.py | 40 +++++++++++++++++-------------
2 files changed, 28 insertions(+), 17 deletions(-)
diff --git a/tests/common/impala_connection.py
b/tests/common/impala_connection.py
index 536990bb2..f57c1f3a3 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -758,6 +758,11 @@ class ImpylaHS2Connection(ImpalaConnection):
id = session_handle_to_session_id(cursor.session.handle)
return "" if id is None else id
+ def session_id(self, operation_handle):
+ cursor = operation_handle.get_handle()
+ session_id = self.__get_session_id(cursor)
+ return session_id if session_id else str(cursor.session)
+
def handle_id(self, operation_handle):
query_id = self.get_query_id(operation_handle)
return query_id if query_id else str(operation_handle)
diff --git a/tests/custom_cluster/test_query_retries.py
b/tests/custom_cluster/test_query_retries.py
index 4cda46275..b372ea574 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -46,7 +46,6 @@ from tests.common.skip import (
SkipIfNotHdfsMinicluster,
)
from tests.common.test_dimensions import add_mandatory_exec_option
-from tests.common.test_vector import BEESWAX
# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
FAILED_KRPC_PORT = 27001
@@ -70,11 +69,6 @@ def _get_disk_fail_action(port):
@SkipIfEC.parquet_file_size
class TestQueryRetries(CustomClusterTestSuite):
- @classmethod
- def default_test_protocol(cls):
- # Retry mechanism is slightly different between beeswax vs hs2 protocol.
- return BEESWAX
-
# A query that shuffles a lot of data. Useful when testing query retries
since it
# ensures that a query fails during a TransmitData RPC. The RPC failure will
cause the
# target impalad to be blacklisted and the query to be retried. The query
also has to
@@ -156,6 +150,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Launch a query, it should be retried.
handle = self.execute_query_async(self._shuffle_heavy_query,
query_options={'retry_failed_queries': 'true'})
+ query_id = self.client.handle_id(handle)
self.client.wait_for_impala_state(handle, RUNNING, 60)
# Kill a random impalad.
@@ -203,7 +198,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Assert that the second most recently completed query is the original
query and it is
# marked as 'RETRIED'.
assert completed_queries[1]['state'] == 'RETRIED'
- assert completed_queries[1]['query_id'] == self.client.handle_id(handle)
+ assert completed_queries[1]['query_id'] == query_id
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -396,6 +391,7 @@ class TestQueryRetries(CustomClusterTestSuite):
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
+ query_id = self.client.handle_id(handle)
# Wait until the query fails.
self.client.wait_for_impala_state(handle, ERROR, 140)
@@ -424,6 +420,8 @@ class TestQueryRetries(CustomClusterTestSuite):
"queries scheduled only on the coordinator (either NUM_NODES set
to 1 " \
"or when small query optimization is triggered) can currently
run" in str(e)
assert "Additional Details: Not Applicable" in str(e)
+ finally:
+ self.client.close_query(handle)
# Assert that the RPC un-reachable impalad not shows up in the list of
blacklisted
# executors from the runtime profile.
@@ -432,8 +430,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Assert that the query id of the original query is in the runtime profile
of the
# retried query.
- self.__validate_original_id_in_profile(retried_runtime_profile,
- self.client.handle_id(handle))
+ self.__validate_original_id_in_profile(retried_runtime_profile, query_id)
# Validate the state of the web ui. The query must be closed before
validating the
# state since it asserts that no queries are in flight.
@@ -484,6 +481,8 @@ class TestQueryRetries(CustomClusterTestSuite):
assert False
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)
+ finally:
+ self.client.close_query(handle)
# Assert that the killed impalad shows up in the list of blacklisted
executors from
# the runtime profile.
@@ -788,21 +787,28 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
- handle = self.execute_query_async(query,
- query_options={'retry_failed_queries': 'true',
- 'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
- query_id = self.client.handle_id(handle)
+ self.hs2_client.set_configuration({
+ 'retry_failed_queries': 'true',
+ 'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
+ handle = self.hs2_client.execute_async(query)
+ query_id = self.hs2_client.handle_id(handle)
+ session_id = self.hs2_client.session_id(handle)
self.__wait_until_retry_state(query_id, 'RETRIED')
# SetQueryInflight will complete before execute_query_async returns
because it will
# be completed before Impala acknowledges that the query has started.
page =
self.cluster.get_first_impalad().service.get_debug_webpage_json('sessions')
+ session_found = False
for session in page['sessions']:
- # Every session should have one completed query: either test setup, or
the original
- # query that's being retried.
- assert session['inflight_queries'] < session['total_queries']
+ if session['session_id'] == session_id:
+ # Session 'session_id' should have one completed query: either test
setup, or
+ # the original query that's being retried.
+ assert session['inflight_queries'] < session['total_queries']
+ session_found = True
+ self.hs2_client.close_query(handle)
+ self.hs2_client.clear_configuration()
+ assert session_found, "session_id {} not found at {}".format(session_id,
page)
- self.client.close_query(handle)
# If original query state closure is skipped, the coordinator will crash
on a DCHECK.
time.sleep(2)
assert self.cluster.impalads[0].get_pid() is not None, "Coordinator
crashed"