This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8605246f3cb200f25dc426c858a5d606514db293
Author: jasonmfehr <[email protected]>
AuthorDate: Wed Feb 26 09:35:03 2025 -0800

    IMPALA-13710: Fix Flaky Workload Management Custom Cluster Tests
    
    The custom cluster workload management tests occasionally fail with
    query not found errors. These failures are very few and far between
    but still cause test flakiness.
    
    This patch adds retries into the common assertion functions to ensure
    that a slightly slower than normal workload management insert dml
    does not cause test failures.
    
    Testing accomplished by the following test suites passing locally:
      * tests/custom_cluster/test_query_live.py
      * tests/custom_cluster/test_query_log.py
      * tests/custom_cluster/test_workload_mgmt_init.py
      * tests/custom_cluster/test_workload_mgmt_sql_details.py
    
    Change-Id: I38dad4819190fa44ca2933be9907d7f1273621e3
    Reviewed-on: http://gerrit.cloudera.org:8080/22551
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/util/workload_management.py | 66 ++++++++++++++++++++++++++++-----------
 1 file changed, 47 insertions(+), 19 deletions(-)

diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index f2204899d..a98c89fad 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -21,6 +21,7 @@ import re
 import requests
 
 from datetime import datetime
+from time import sleep, time
 
 from SystemTables.ttypes import TQueryTableColumn
 from tests.util.assert_time import assert_time_str, convert_to_milliseconds
@@ -32,6 +33,9 @@ QUERY_TBL_LOG_NAME = "impala_query_log"
 QUERY_TBL_LOG = "{0}.{1}".format(WM_DB, QUERY_TBL_LOG_NAME)
 QUERY_TBL_LIVE_NAME = "impala_query_live"
 QUERY_TBL_LIVE = "{0}.{1}".format(WM_DB, QUERY_TBL_LIVE_NAME)
+# Time in seconds the assert_query and assert_csv_col will wait for the query 
to become
+# available in the relevant workload management table.
+ASSERT_QUERY_TIMEOUT_S = 30
 
 
 def round_to_3(val):
@@ -66,17 +70,30 @@ def assert_query(query_tbl, client, expected_cluster_id="", 
raw_profile=None,
   print("Query Id: {0}".format(query_id))
   profile_lines = profile_text.split("\n")
 
-  # Force Impala to process the inserts to the completed queries table.
-  if query_tbl != QUERY_TBL_LIVE:
-    client.execute("refresh " + query_tbl)
+  success = False
+  sql_results = None
+
+  start_time = time()
+  while (time() - start_time <= ASSERT_QUERY_TIMEOUT_S):
+    # Force Impala to process the inserts to the completed queries table.
+    if query_tbl != QUERY_TBL_LIVE:
+      client.execute("refresh " + query_tbl)
+
+    # Assert the query was written correctly to the query log table.
+    if max_row_size is not None:
+      client.set_configuration_option("MAX_ROW_SIZE", max_row_size)
+    sql_results = client.execute("select * from {0} where 
query_id='{1}'".format(
+        query_tbl, query_id))
+    if sql_results.success and len(sql_results.data) == 1:
+      success = True
+      break
+
+    # Query is not yet available in the workload management table, wait and 
try again.
+    sleep(1)
+
+  assert success, "Did not find query '{}' in the '{}' table after multiple 
attempts" \
+      .format(query_id, query_tbl)
 
-  # Assert the query was written correctly to the query log table.
-  if max_row_size is not None:
-    client.set_configuration_option("MAX_ROW_SIZE", max_row_size)
-  sql_results = client.execute("select * from {0} where query_id='{1}'".format(
-      query_tbl, query_id))
-  assert sql_results.success
-  assert len(sql_results.data) == 1, "did not find query in completed queries 
table"
 
   # Assert the expected columns were included.
   assert len(sql_results.column_labels) == 
len(TQueryTableColumn._VALUES_TO_NAMES)
@@ -675,16 +692,27 @@ def assert_csv_col(client, query_tbl, col, query_id, 
expected_list, db="tpcds"):
 
   print("Query Id: {0}".format(query_id))
 
-  # Force Impala to process the inserts to the completed queries table.
-  if query_tbl != QUERY_TBL_LIVE:
-    client.execute("refresh " + query_tbl)
+  success = False
+  sql_results = None
+
+  start_time = time()
+  while (time() - start_time <= ASSERT_QUERY_TIMEOUT_S):
+    # Force Impala to process the inserts to the completed queries table.
+    if query_tbl != QUERY_TBL_LIVE:
+      client.execute("refresh " + query_tbl)
+
+    # Assert the query was written correctly to the query log table.
+    sql_results = client.execute("select * from {0} where 
query_id='{1}'".format(
+        query_tbl, query_id))
+    if sql_results.success and len(sql_results.data) == 1:
+      success = True
+      break
+
+    # Query is not yet available in the workload management table, wait and 
try again.
+    sleep(1)
 
-  # Assert the query was written correctly to the query log table.
-  sql_results = client.execute("select * from {0} where query_id='{1}'".format(
-      query_tbl, query_id))
-  assert sql_results.success
-  assert len(sql_results.data) == 1, "did not find query '{}' in completed 
queries " \
-      "table".format(query_id)
+  assert success, "Did not find query '{}' in the '{}' table after multiple 
attempts" \
+      .format(query_id, query_tbl)
 
   data = sql_results.data[0].split("\t")
   actual = []

Reply via email to