This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e666e07110216509cbc2d1d874ba4bf9ec32147e
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Mar 4 11:42:52 2024 -0800

    IMPALA-12678: Deflake test_skipping_batching_events
    
    test_skipping_batching_events is flaky. It expect that REFRESH query
    will arrive before ALTER_PARTITION is polled and processed, but the
    opposite can happens too.
    
    This patch deflake the test by injecting delay inside
    MetastoreEvents.getFilteredEvents() rather than increasing
    hms_event_polling_interval_s. The delay injection is specified through
    debug_actions flag. This patch also add method in ImpaladProcess and
    CatalogdProcess to help change JVM log level from pytest method.
    
    Testing:
    - Loop and pass test_skipping_batching_events 100 times.
    
    Change-Id: Ia6e4cd1e9492e3ce75f5089038b90d0af4fbdb0f
    Reviewed-on: http://gerrit.cloudera.org:8080/21107
    Reviewed-by: Sai Hemanth Gantasala <[email protected]>
    Reviewed-by: Jason Fehr <[email protected]>
    Reviewed-by: Quanlong Huang <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../impala/catalog/events/MetastoreEvents.java     |  7 ++++
 .../catalog/events/MetastoreEventsProcessor.java   |  1 -
 .../org/apache/impala/service/BackendConfig.java   |  2 +
 .../java/org/apache/impala/util/DebugUtils.java    |  4 ++
 tests/common/impala_cluster.py                     | 47 ++++++++++++++++++----
 tests/custom_cluster/test_events_custom_configs.py |  6 ++-
 8 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index ec7f63876..4fc42e155 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -116,6 +116,7 @@ DECLARE_int32(catalog_operation_log_size);
 DECLARE_string(hostname);
 DECLARE_bool(allow_catalog_cache_op_from_masked_users);
 DECLARE_int32(topic_update_log_gc_frequency);
+DECLARE_string(debug_actions);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -453,6 +454,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_allow_catalog_cache_op_from_masked_users(
       FLAGS_allow_catalog_cache_op_from_masked_users);
   cfg.__set_topic_update_log_gc_frequency(FLAGS_topic_update_log_gc_frequency);
+  cfg.__set_debug_actions(FLAGS_debug_actions);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index a0c41a1b2..2b96bc9fe 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -278,4 +278,6 @@ struct TBackendGflags {
   123: required bool iceberg_allow_datafiles_in_table_location_only
 
   124: required i32 topic_update_log_gc_frequency
+
+  125: required string debug_actions
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 96bed7e1c..b5b1f6db0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -79,6 +80,7 @@ import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.MetaStoreUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
@@ -255,6 +257,11 @@ public class MetastoreEvents {
       Preconditions.checkNotNull(events);
       if (events.isEmpty()) return Collections.emptyList();
 
+      if (StringUtils.isNotEmpty(BackendConfig.INSTANCE.debugActions())) {
+        DebugUtils.executeDebugAction(
+            BackendConfig.INSTANCE.debugActions(), 
DebugUtils.GET_FILTERED_EVENTS_DELAY);
+      }
+
       List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size());
       for (NotificationEvent event : events) {
         metastoreEvents.add(get(event, metrics));
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index aa49cfc47..139d281d7 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -48,7 +48,6 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.MetastoreClientInstantiationException;
 import org.apache.impala.catalog.Table;
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index c7b4a0bef..3ec6f5da7 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -457,4 +457,6 @@ public class BackendConfig {
   public boolean allowCatalogCacheOpFromMaskedUsers() {
     return backendCfg_.allow_catalog_cache_op_from_masked_users;
   }
+
+  public String debugActions() { return backendCfg_.debug_actions; }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 7f9d1b623..fa85399ac 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -67,6 +67,10 @@ public class DebugUtils {
   public static final String UPDATE_CATALOG_ABORT_INSERT_TXN =
       "catalogd_update_catalog_abort_txn";
 
+  // debug action label to delay event processing.
+  public static final String GET_FILTERED_EVENTS_DELAY =
+      "catalogd_get_filtered_events_delay";
+
   // debug action label for introducing delay in loading table metadata.
   public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
 
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 8e1d0a70f..51fefa715 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -25,8 +25,8 @@ import os
 import pipes
 import psutil
 import socket
-import sys
 import time
+import requests
 from getpass import getuser
 from random import choice
 from signal import SIGKILL
@@ -39,7 +39,7 @@ from tests.common.impala_service import (
     CatalogdService,
     ImpaladService,
     StateStoredService)
-from tests.util.shell_util import exec_process, exec_process_async
+from tests.util.shell_util import exec_process
 
 LOG = logging.getLogger('impala_cluster')
 LOG.setLevel(level=logging.DEBUG)
@@ -70,6 +70,20 @@ DEFAULT_CATALOGD_JVM_DEBUG_PORT = 30030
 # flakiness.
 CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
 
+# Url format used to set JVM log level.
+SET_JAVA_LOGLEVEL_URL = "http://{0}:{1}/set_java_loglevel";
+
+
+def post_data(url, data):
+  """Helper method to post data to a url."""
+  response = requests.head(url)
+  assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format(
+    url, response.text)
+  response = requests.post(url, data=data)
+  assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format(
+    url, response.text)
+  return response
+
 
 # Represents a set of Impala processes.
 # Handles two cases:
@@ -208,13 +222,12 @@ class ImpalaCluster(object):
       assert self.statestored is not None
       assert self.catalogd is not None
 
-
     for impalad in self.impalads:
       
impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
           timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2,
           early_abort_fn=check_processes_still_running)
-      if (impalad._get_arg_value("is_coordinator", default="true") == "true" 
and
-         impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 
0):
+      if (impalad._get_arg_value("is_coordinator", default="true") == "true"
+         and impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) 
== 0):
         impalad.wait_for_catalog()
 
   def wait_for_num_impalads(self, num_impalads, retries=10):
@@ -439,7 +452,7 @@ class Process(object):
     if self.container_id is None:
       binary = os.path.basename(self.cmd[0])
       restart_args = self.cmd[1:]
-      LOG.info("Starting {0} with arguments".format(binary, restart_args))
+      LOG.info("Starting {0} with arguments {1}".format(binary, restart_args))
       run_daemon(binary, restart_args)
     else:
       LOG.info("Starting container: {0}".format(self.container_id))
@@ -488,6 +501,11 @@ class BaseImpalaProcess(Process):
     """Return the port for the webserver of this process."""
     return int(self._get_port('webserver_port', 
self._get_default_webserver_port()))
 
+  def set_jvm_log_level(self, class_name, level="info"):
+    """Helper method to set JVM log level for certain class name.
+    Some daemon might not have JVM in it."""
+    raise NotImplementedError()
+
   def _get_default_webserver_port(self):
     """Different daemons have different defaults. Subclasses must override."""
     raise NotImplementedError()
@@ -563,8 +581,8 @@ class ImpaladProcess(BaseImpalaProcess):
     hs2_port_is_open = False
     num_dbs = 0
     num_tbls = 0
-    while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) and
-        not (beeswax_port_is_open and hs2_port_is_open)):
+    while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
+        and not (beeswax_port_is_open and hs2_port_is_open)):
       try:
         num_dbs, num_tbls = self.service.get_metric_values(
             ["catalog.num-databases", "catalog.num-tables"])
@@ -582,6 +600,12 @@ class ImpaladProcess(BaseImpalaProcess):
           "Unable to open client ports within {num_seconds} seconds.".format(
               num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS))
 
+  def set_jvm_log_level(self, class_name, level):
+    """Helper method to set JVM log level for certain class name."""
+    url = SET_JAVA_LOGLEVEL_URL.format(
+      self.webserver_interface, self.get_webserver_port())
+    return post_data(url, {"class": class_name, "level": level})
+
 
 # Represents a statestored process
 class StateStoreProcess(BaseImpalaProcess):
@@ -635,6 +659,12 @@ class CatalogdProcess(BaseImpalaProcess):
       self.service.wait_for_metric_value('statestore-subscriber.connected',
                                          expected_value=1, timeout=30)
 
+  def set_jvm_log_level(self, class_name, level):
+    """Helper method to set JVM log level for certain class name."""
+    url = SET_JAVA_LOGLEVEL_URL.format(
+      self.webserver_interface, self.get_webserver_port())
+    return post_data(url, {"class": class_name, "level": level})
+
 
 # Represents an admission control process.
 class AdmissiondProcess(BaseImpalaProcess):
@@ -646,6 +676,7 @@ class AdmissiondProcess(BaseImpalaProcess):
   def _get_default_webserver_port(self):
     return DEFAULT_ADMISSIOND_WEBSERVER_PORT
 
+
 def find_user_processes(binaries):
   """Returns an iterator over all processes owned by the current user with a 
matching
   binary name from the provided list. Return a iterable of tuples, with each 
tuple
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index eafdcda1f..eda2e6163 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -393,12 +393,14 @@ class 
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     verify_skipping_older_events(test_old_table, True, True)
 
   @CustomClusterTestSuite.with_args(
-    catalogd_args="--hms_event_polling_interval_s=5"
-                  " --enable_sync_to_latest_event_on_ddls=true")
+    catalogd_args="--enable_sync_to_latest_event_on_ddls=true "
+                  
"--debug_actions=catalogd_get_filtered_events_delay:SLEEP@3000 ")
   def test_skipping_batching_events(self, unique_database):
     """Test to verify IMPALA-10949, improving batching logic for partition 
events.
     Before batching the events, each event is checked if the event id is 
greater than
     table's lastSyncEventId then the event can be batched else it can be 
skipped."""
+    # Print trace logs from DebugUtils.
+    
self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", 
"trace")
     test_batch_table = "test_batch_table"
     self.client.execute(
       "create table {}.{} like functional.alltypes"

Reply via email to