This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c0c6cc9df4131f87737ff975e82d3e4832a2181b Author: Riza Suminto <[email protected]> AuthorDate: Thu May 1 12:12:18 2025 -0700 IMPALA-12201: Stabilize TestFetch This patch attempt to stabilize TestFetch by using HS2 as test protocol. test_rows_sent_counters is modified to use the default hs2_client. test_client_fetch_time_stats and test_client_fetch_time_stats_incomplete is modified to use MinimalHS2Connection that has more simpler mechanism in terms of fetching (ImpylaHS2Connection always fetch 10240 rows at a time). Implemented minimal functions needed to wait for finished state and pull runtime profile at MinimalHS2Connection. Testing: Loop the test 50 times and pass them all. Change-Id: I52651df37a318357711d26d2414e025cce4185c3 Reviewed-on: http://gerrit.cloudera.org:8080/22847 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/common/impala_connection.py | 21 +++++++++++-- tests/query_test/test_fetch.py | 65 ++++++++++++++++++++++----------------- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index caca64e35..6d6f6d338 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -1142,12 +1142,27 @@ class MinimalHS2Connection(ImpalaConnection): def get_impala_exec_state(self, operation_handle): # noqa: U100 raise NotImplementedError() - def get_runtime_profile(self, operation_handle, # noqa: U100 - profile_format=TRuntimeProfileFormat.STRING): # noqa: U100 - raise NotImplementedError() + def get_runtime_profile(self, operation_handle, + profile_format=TRuntimeProfileFormat.STRING): + return self.__get_operation(operation_handle).get_profile(profile_format) def wait_for_admission_control(self, operation_handle, timeout_s=60): # noqa: U100 raise NotImplementedError() def get_exec_summary(self, operation_handle): # noqa: U100 raise NotImplementedError() + + def wait_for_finished_timeout(self, operation_handle, timeout): + start_time = time.time() + while time.time() - start_time < timeout: + start_rpc_time = time.time() + hs2_state = self.get_state(operation_handle) + rpc_time = time.time() - start_rpc_time + # if the rpc succeeded, the output is the query state + if hs2_state == "FINISHED_STATE": + return True + elif hs2_state == "ERROR_STATE": + break + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) + return False diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py index 390f0aa4f..512e10fd5 100644 --- a/tests/query_test/test_fetch.py +++ b/tests/query_test/test_fetch.py @@ -19,9 +19,12 @@ from __future__ import absolute_import, division, print_function import re from time import sleep -from tests.common.impala_connection import FINISHED -from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.test_dimensions import extend_exec_option_dimension +from tests.common.impala_connection import FINISHED, MinimalHS2Connection +from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite +from tests.common.test_dimensions import ( + create_uncompressed_text_dimension, + extend_exec_option_dimension, +) from tests.util.parse_util import parse_duration_string_ms, \ parse_duration_string_ns, get_time_summary_stats_counter @@ -33,26 +36,27 @@ class TestFetch(ImpalaTestSuite): def add_test_dimensions(cls): super(TestFetch, cls).add_test_dimensions() # Result fetching should be independent of file format, so only test against - # Parquet files. - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet') + # text files. + cls.ImpalaTestMatrix.add_dimension( + create_uncompressed_text_dimension(cls.get_workload())) - def test_rows_sent_counters(self, vector): + def test_rows_sent_counters(self): """Validate that ClientFetchWaitTimer, NumRowsFetched, RowMaterializationRate, and RowMaterializationTimer are set to valid values in the ImpalaServer section of the runtime profile.""" num_rows = 25 query = "select sleep(100) from functional.alltypes limit {0}".format(num_rows) - handle = self.execute_query_async(query, vector.get_value('exec_option')) + client = self.hs2_client + handle = client.execute_async(query) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.client.wait_for_impala_state(handle, FINISHED, 30) + client.wait_for_impala_state(handle, FINISHED, 30) # Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s. sleep(2.5) # Fetch the results so that the fetch related counters are updated. - assert self.client.fetch(query, handle).success + assert client.fetch(query, handle).success - runtime_profile = self.client.get_runtime_profile(handle) + runtime_profile = client.get_runtime_profile(handle) fetch_timer = re.search("ClientFetchWaitTimer: (.*)", runtime_profile) assert fetch_timer and len(fetch_timer.groups()) == 1 and \ parse_duration_string_ms(fetch_timer.group(1)) > 1000 @@ -82,25 +86,26 @@ class TestFetch(ImpalaTestSuite): assert 2400 < create_result_ms and create_result_ms < 2600 finally: - self.client.close_query(handle) + client.close_query(handle) - def test_client_fetch_time_stats(self, vector): + def test_client_fetch_time_stats(self): num_rows = 27 + client = MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows) - handle = self.execute_query_async(query, vector.get_value('exec_option')) + handle = client.execute_async(query) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.client.wait_for_impala_state(handle, FINISHED, 30) + client.wait_for_finished_timeout(handle, 30) # This loop will do 6 fetches that contain data and a final fetch with # no data. The last fetch is after eos has been set, so it does not count. rows_fetched = 0 while True: - result = self.client.fetch(query, handle, max_rows=5) - assert result.success - rows_fetched += len(result.data) + result = client.fetch(query, handle, max_rows=5) + assert result is not None + rows_fetched += len(result) # If no rows are returned, we are done. - if len(result.data) == 0: + if len(result) == 0: break sleep(0.1) @@ -108,9 +113,10 @@ class TestFetch(ImpalaTestSuite): # count as client wait time, because the query is already done. sleep(2.5) finally: - self.client.close_query(handle) + client.close_query(handle) + + runtime_profile = client.get_runtime_profile(handle) - runtime_profile = self.client.get_runtime_profile(handle) summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats", runtime_profile) assert len(summary_stats) == 1 @@ -118,28 +124,30 @@ class TestFetch(ImpalaTestSuite): # The 2.5 second sleep should not count, so the max must be less than 2.5 seconds. assert summary_stats[0].max_value < 2500000000 assert summary_stats[0].min_value > 0 + client.close() - def test_client_fetch_time_stats_incomplete(self, vector): + def test_client_fetch_time_stats_incomplete(self): num_rows = 27 + client = MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows) - handle = self.execute_query_async(query, vector.get_value('exec_option')) + handle = client.execute_async(query) try: # Wait until the query is 'FINISHED' and results are available for fetching. - self.client.wait_for_impala_state(handle, FINISHED, 30) + client.wait_for_finished_timeout(handle, 30) # This loop will do 5 fetches for a total of 25 rows. This is incomplete. for i in range(5): - result = self.client.fetch(query, handle, max_rows=5) - assert result.success + result = client.fetch(query, handle, max_rows=5) + assert result is not None sleep(0.1) # Sleep before closing the query. For an incomplete fetch, this still counts # towards the query time, so this does show up in the counters. sleep(2.5) finally: - self.client.close_query(handle) + client.close_query(handle) - runtime_profile = self.client.get_runtime_profile(handle) + runtime_profile = client.get_runtime_profile(handle) summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats", runtime_profile) @@ -149,6 +157,7 @@ class TestFetch(ImpalaTestSuite): # The 2.5 second sleep does count for an incomplete fetch, verify the max is higher. assert summary_stats[0].max_value >= 2500000000 assert summary_stats[0].min_value > 0 + client.close() class TestFetchAndSpooling(ImpalaTestSuite):
