This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e8292ef5 IMPALA-12916: Fix 
test_event_processor_error_global_invalidate test random failure
5e8292ef5 is described below

commit 5e8292ef53c8982bcedcd7e2931f95ce069b0d2e
Author: Venu Reddy <[email protected]>
AuthorDate: Wed Mar 20 02:38:51 2024 +0530

    IMPALA-12916: Fix test_event_processor_error_global_invalidate test random 
failure
    
    Event processor goes to error state before it tries to global invalidate
    It remains in error state for a very short period of time. If
    wait_for_synced_event_id() obtains event processor status during
    this period, it can get status as error. This test was introduced
    with IMPALA-12832.
    
    Testing:
    - Tested manually. Added sleep in code for testing so that event
    processor remains in error state for little longer time.
    
    Change-Id: I787cff4cc9f9df345cd715c02b51b8d93a150edf
    Reviewed-on: http://gerrit.cloudera.org:8080/21169
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_event_processing_error.py |  2 +-
 tests/util/event_processor_utils.py                 | 17 +++++++++++------
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/tests/custom_cluster/test_event_processing_error.py 
b/tests/custom_cluster/test_event_processing_error.py
index f64a3d3e1..ec41da786 100644
--- a/tests/custom_cluster/test_event_processing_error.py
+++ b/tests/custom_cluster/test_event_processing_error.py
@@ -330,7 +330,7 @@ class TestEventProcessingError(CustomClusterTestSuite):
       self.run_stmt_in_hive(
           "alter table {}.{} add partition(year=2024)"
           .format(unique_database, tbl_name))
-      EventProcessorUtils.wait_for_event_processing(self)
+      EventProcessorUtils.wait_for_event_processing(self, 
error_status_possible=True)
       assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
       result = self.client.execute("describe formatted {}.{}"
           .format(unique_database, tbl_name))
diff --git a/tests/util/event_processor_utils.py 
b/tests/util/event_processor_utils.py
index 0bf5ec17e..2c3cb743d 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -38,7 +38,7 @@ class EventProcessorUtils(object):
   DEFAULT_CATALOG_URL = "http://localhost:25020";
 
   @staticmethod
-  def wait_for_synced_event_id(timeout, target_event_id):
+  def wait_for_synced_event_id(timeout, error_status_possible, 
target_event_id):
     LOG.info("Waiting until events processor syncs to event id:" + str(
         target_event_id))
     # Wait more than timeout in case there is some progress in synced events.
@@ -47,6 +47,9 @@ class EventProcessorUtils(object):
     # last event.
     TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS = 10
     total_timeot = timeout * TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS
+    status_list = ["ACTIVE", "PAUSED"]
+    if error_status_possible:
+      status_list.append("ERROR")
     end_time = time.time() + total_timeot
     last_synced_id = EventProcessorUtils.get_last_synced_event_id()
     last_synced_time = time.time()
@@ -59,7 +62,7 @@ class EventProcessorUtils(object):
             target_event_id)
         break
       status = EventProcessorUtils.get_event_processor_status()
-      if status not in ["ACTIVE", "PAUSED"]:
+      if status not in status_list:
         error_msg = EventProcessorUtils.get_event_processor_error_msg()
         raise Exception("Event processor is not working. Status: {0}. Error 
msg: {1}"
                         .format(status, error_msg))
@@ -81,16 +84,17 @@ class EventProcessorUtils(object):
       time.sleep(0.1)
 
   @staticmethod
-  def wait_for_event_processing(test_suite, timeout=10):
+  def wait_for_event_processing(test_suite, timeout=10, 
error_status_possible=False):
     if isinstance(test_suite, CustomClusterTestSuite):
       impala_cluster = test_suite.cluster
     else:
       impala_cluster = ImpalaCluster.get_e2e_test_cluster()
     EventProcessorUtils.wait_for_event_processing_impl(test_suite.hive_client,
-      impala_cluster, timeout)
+      impala_cluster, timeout, error_status_possible)
 
   @staticmethod
-  def wait_for_event_processing_impl(hive_client, impala_cluster, timeout=10):
+  def wait_for_event_processing_impl(hive_client, impala_cluster, timeout=10,
+                                     error_status_possible=False):
     """Waits till the event processor has synced to the latest event id from 
metastore
     or the timeout value in seconds whichever is earlier"""
     if EventProcessorUtils.get_event_processor_status() == "DISABLED":
@@ -99,7 +103,8 @@ class EventProcessorUtils(object):
     assert hive_client is not None
     current_event_id = EventProcessorUtils.get_current_notification_id(
       hive_client)
-    EventProcessorUtils.wait_for_synced_event_id(timeout, current_event_id)
+    EventProcessorUtils.wait_for_synced_event_id(timeout, 
error_status_possible,
+      current_event_id)
     # Wait until the impalad catalog versions agree with the catalogd's 
version.
     catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
     for impalad in impala_cluster.impalads:

Reply via email to