This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1164eb5626d9a5d09c42b4dc56414514eecf46be Author: Riza Suminto <[email protected]> AuthorDate: Wed Mar 5 15:32:50 2025 -0800 IMPALA-13827: Deflake test_user_loads_propagate TestAdmissionController.test_user_loads_propagate has been flaky for not finding the expected username metric in the first impalad. This patch attempt to deflake the test by raising the wait time between running query and metric check. The order of metric check is also reversed to give slightly more time for first impalad to hear about query running in second impalad. Testing: - Loop and pass the test 50 times. Before the patch, the test fail within 10 iteration. Change-Id: I6920c7bc9ba1a9fc646aaf0483a1a72608a2a90e Reviewed-on: http://gerrit.cloudera.org:8080/22584 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/custom_cluster/test_admission_controller.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index e40214ee5..51386151e 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -165,9 +165,10 @@ def metric_key(pool_name, metric_name): return "admission-controller.%s.%s" % (metric_name, pool_name) -def wait_single_statestore_heartbeat(): +def wait_statestore_heartbeat(num_heartbeat=1): """Wait for state sync across impalads.""" - sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0) + assert num_heartbeat > 0 + sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0 * num_heartbeat) class TestAdmissionControllerBase(CustomClusterTestSuite): @@ -889,7 +890,7 @@ class TestAdmissionController(TestAdmissionControllerBase): impalad_with_2g_mem.set_configuration_option('mem_limit', '1G') impalad_with_2g_mem.execute_async("select sleep(1000)") # Wait for statestore update to update the mem admitted in each node. - wait_single_statestore_heartbeat() + wait_statestore_heartbeat() exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "2G" # Since Queuing is synchronous, and we can't close the previous query till this @@ -1321,13 +1322,15 @@ class TestAdmissionController(TestAdmissionControllerBase): pool=pool) query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, USER_ROOT, pool=pool) - wait_single_statestore_heartbeat() + wait_statestore_heartbeat(num_heartbeat=3) keys = [ "admission-controller.agg-current-users.root.queueB", "admission-controller.local-current-users.root.queueB", ] - values1 = impalad1.service.get_metric_values(keys) + # Order matter, since impalad1 run the query ahead of impalad2. + # This give slightly longer time for impalad1 to hear about query in impalad2. values2 = impalad2.service.get_metric_values(keys) + values1 = impalad1.service.get_metric_values(keys) if self.get_ac_log_name() == 'impalad': if user_loads_present: @@ -1405,7 +1408,7 @@ class TestAdmissionController(TestAdmissionControllerBase): query_handles.append(query_handle) # Let state sync across impalads. - wait_single_statestore_heartbeat() + wait_statestore_heartbeat() # Another query should be rejected impalad = self.cluster.impalads[limit % 2] @@ -2421,7 +2424,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): REQUEST_QUEUE_UPDATE_INTERVAL)['count'] assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,) - wait_single_statestore_heartbeat() + wait_statestore_heartbeat() LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats) def wait_for_admitted_threads(self, num_threads):
