This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1164eb5626d9a5d09c42b4dc56414514eecf46be
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Mar 5 15:32:50 2025 -0800

    IMPALA-13827: Deflake test_user_loads_propagate
    
    TestAdmissionController.test_user_loads_propagate has been flaky for not
    finding the expected username metric in the first impalad. This patch
    attempt to deflake the test by raising the wait time between running
    query and metric check. The order of metric check is also reversed to
    give slightly more time for first impalad to hear about query running in
    second impalad.
    
    Testing:
    - Loop and pass the test 50 times. Before the patch, the test fail
      within 10 iteration.
    
    Change-Id: I6920c7bc9ba1a9fc646aaf0483a1a72608a2a90e
    Reviewed-on: http://gerrit.cloudera.org:8080/22584
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_admission_controller.py | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index e40214ee5..51386151e 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -165,9 +165,10 @@ def metric_key(pool_name, metric_name):
   return "admission-controller.%s.%s" % (metric_name, pool_name)
 
 
-def wait_single_statestore_heartbeat():
+def wait_statestore_heartbeat(num_heartbeat=1):
   """Wait for state sync across impalads."""
-  sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
+  assert num_heartbeat > 0
+  sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0 * num_heartbeat)
 
 
 class TestAdmissionControllerBase(CustomClusterTestSuite):
@@ -889,7 +890,7 @@ class TestAdmissionController(TestAdmissionControllerBase):
       impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
       impalad_with_2g_mem.execute_async("select sleep(1000)")
       # Wait for statestore update to update the mem admitted in each node.
-      wait_single_statestore_heartbeat()
+      wait_statestore_heartbeat()
       exec_options = deepcopy(vector.get_value('exec_option'))
       exec_options['mem_limit'] = "2G"
       # Since Queuing is synchronous, and we can't close the previous query 
till this
@@ -1321,13 +1322,15 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
                                                      pool=pool)
     query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, 
USER_ROOT,
                                                      pool=pool)
-    wait_single_statestore_heartbeat()
+    wait_statestore_heartbeat(num_heartbeat=3)
     keys = [
       "admission-controller.agg-current-users.root.queueB",
       "admission-controller.local-current-users.root.queueB",
     ]
-    values1 = impalad1.service.get_metric_values(keys)
+    # Order matter, since impalad1 run the query ahead of impalad2.
+    # This give slightly longer time for impalad1 to hear about query in 
impalad2.
     values2 = impalad2.service.get_metric_values(keys)
+    values1 = impalad1.service.get_metric_values(keys)
 
     if self.get_ac_log_name() == 'impalad':
       if user_loads_present:
@@ -1405,7 +1408,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       query_handles.append(query_handle)
 
     # Let state sync across impalads.
-    wait_single_statestore_heartbeat()
+    wait_statestore_heartbeat()
 
     # Another query should be rejected
     impalad = self.cluster.impalads[limit % 2]
@@ -2421,7 +2424,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
             REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
-      wait_single_statestore_heartbeat()
+      wait_statestore_heartbeat()
     LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), 
heartbeats)
 
   def wait_for_admitted_threads(self, num_threads):

Reply via email to