This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e666e07110216509cbc2d1d874ba4bf9ec32147e Author: Riza Suminto <[email protected]> AuthorDate: Mon Mar 4 11:42:52 2024 -0800 IMPALA-12678: Deflake test_skipping_batching_events test_skipping_batching_events is flaky. It expect that REFRESH query will arrive before ALTER_PARTITION is polled and processed, but the opposite can happens too. This patch deflake the test by injecting delay inside MetastoreEvents.getFilteredEvents() rather than increasing hms_event_polling_interval_s. The delay injection is specified through debug_actions flag. This patch also add method in ImpaladProcess and CatalogdProcess to help change JVM log level from pytest method. Testing: - Loop and pass test_skipping_batching_events 100 times. Change-Id: Ia6e4cd1e9492e3ce75f5089038b90d0af4fbdb0f Reviewed-on: http://gerrit.cloudera.org:8080/21107 Reviewed-by: Sai Hemanth Gantasala <[email protected]> Reviewed-by: Jason Fehr <[email protected]> Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/util/backend-gflag-util.cc | 2 + common/thrift/BackendGflags.thrift | 2 + .../impala/catalog/events/MetastoreEvents.java | 7 ++++ .../catalog/events/MetastoreEventsProcessor.java | 1 - .../org/apache/impala/service/BackendConfig.java | 2 + .../java/org/apache/impala/util/DebugUtils.java | 4 ++ tests/common/impala_cluster.py | 47 ++++++++++++++++++---- tests/custom_cluster/test_events_custom_configs.py | 6 ++- 8 files changed, 60 insertions(+), 11 deletions(-) diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index ec7f63876..4fc42e155 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -116,6 +116,7 @@ DECLARE_int32(catalog_operation_log_size); DECLARE_string(hostname); DECLARE_bool(allow_catalog_cache_op_from_masked_users); DECLARE_int32(topic_update_log_gc_frequency); +DECLARE_string(debug_actions); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -453,6 +454,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_allow_catalog_cache_op_from_masked_users( FLAGS_allow_catalog_cache_op_from_masked_users); cfg.__set_topic_update_log_gc_frequency(FLAGS_topic_update_log_gc_frequency); + cfg.__set_debug_actions(FLAGS_debug_actions); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index a0c41a1b2..2b96bc9fe 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -278,4 +278,6 @@ struct TBackendGflags { 123: required bool iceberg_allow_datafiles_in_table_location_only 124: required i32 topic_update_log_gc_frequency + + 125: required string debug_actions } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 96bed7e1c..b5b1f6db0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -79,6 +80,7 @@ import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.AcidUtils; +import org.apache.impala.util.DebugUtils; import org.apache.impala.util.MetaStoreUtil; import org.slf4j.LoggerFactory; import org.slf4j.Logger; @@ -255,6 +257,11 @@ public class MetastoreEvents { Preconditions.checkNotNull(events); if (events.isEmpty()) return Collections.emptyList(); + if (StringUtils.isNotEmpty(BackendConfig.INSTANCE.debugActions())) { + DebugUtils.executeDebugAction( + BackendConfig.INSTANCE.debugActions(), DebugUtils.GET_FILTERED_EVENTS_DELAY); + } + List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size()); for (NotificationEvent event : events) { metastoreEvents.add(get(event, metrics)); diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index aa49cfc47..139d281d7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -48,7 +48,6 @@ import org.apache.impala.catalog.CatalogException; import org.apache.impala.catalog.CatalogServiceCatalog; import org.apache.impala.catalog.Db; import org.apache.impala.catalog.HdfsTable; -import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.MetastoreClientInstantiationException; import org.apache.impala.catalog.Table; diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index c7b4a0bef..3ec6f5da7 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -457,4 +457,6 @@ public class BackendConfig { public boolean allowCatalogCacheOpFromMaskedUsers() { return backendCfg_.allow_catalog_cache_op_from_masked_users; } + + public String debugActions() { return backendCfg_.debug_actions; } } diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 7f9d1b623..fa85399ac 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -67,6 +67,10 @@ public class DebugUtils { public static final String UPDATE_CATALOG_ABORT_INSERT_TXN = "catalogd_update_catalog_abort_txn"; + // debug action label to delay event processing. + public static final String GET_FILTERED_EVENTS_DELAY = + "catalogd_get_filtered_events_delay"; + // debug action label for introducing delay in loading table metadata. public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay"; diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 8e1d0a70f..51fefa715 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -25,8 +25,8 @@ import os import pipes import psutil import socket -import sys import time +import requests from getpass import getuser from random import choice from signal import SIGKILL @@ -39,7 +39,7 @@ from tests.common.impala_service import ( CatalogdService, ImpaladService, StateStoredService) -from tests.util.shell_util import exec_process, exec_process_async +from tests.util.shell_util import exec_process LOG = logging.getLogger('impala_cluster') LOG.setLevel(level=logging.DEBUG) @@ -70,6 +70,20 @@ DEFAULT_CATALOGD_JVM_DEBUG_PORT = 30030 # flakiness. CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240 +# Url format used to set JVM log level. +SET_JAVA_LOGLEVEL_URL = "http://{0}:{1}/set_java_loglevel" + + +def post_data(url, data): + """Helper method to post data to a url.""" + response = requests.head(url) + assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format( + url, response.text) + response = requests.post(url, data=data) + assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format( + url, response.text) + return response + # Represents a set of Impala processes. # Handles two cases: @@ -208,13 +222,12 @@ class ImpalaCluster(object): assert self.statestored is not None assert self.catalogd is not None - for impalad in self.impalads: impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads, timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2, early_abort_fn=check_processes_still_running) - if (impalad._get_arg_value("is_coordinator", default="true") == "true" and - impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0): + if (impalad._get_arg_value("is_coordinator", default="true") == "true" + and impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0): impalad.wait_for_catalog() def wait_for_num_impalads(self, num_impalads, retries=10): @@ -439,7 +452,7 @@ class Process(object): if self.container_id is None: binary = os.path.basename(self.cmd[0]) restart_args = self.cmd[1:] - LOG.info("Starting {0} with arguments".format(binary, restart_args)) + LOG.info("Starting {0} with arguments {1}".format(binary, restart_args)) run_daemon(binary, restart_args) else: LOG.info("Starting container: {0}".format(self.container_id)) @@ -488,6 +501,11 @@ class BaseImpalaProcess(Process): """Return the port for the webserver of this process.""" return int(self._get_port('webserver_port', self._get_default_webserver_port())) + def set_jvm_log_level(self, class_name, level="info"): + """Helper method to set JVM log level for certain class name. + Some daemon might not have JVM in it.""" + raise NotImplementedError() + def _get_default_webserver_port(self): """Different daemons have different defaults. Subclasses must override.""" raise NotImplementedError() @@ -563,8 +581,8 @@ class ImpaladProcess(BaseImpalaProcess): hs2_port_is_open = False num_dbs = 0 num_tbls = 0 - while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) and - not (beeswax_port_is_open and hs2_port_is_open)): + while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) + and not (beeswax_port_is_open and hs2_port_is_open)): try: num_dbs, num_tbls = self.service.get_metric_values( ["catalog.num-databases", "catalog.num-tables"]) @@ -582,6 +600,12 @@ class ImpaladProcess(BaseImpalaProcess): "Unable to open client ports within {num_seconds} seconds.".format( num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)) + def set_jvm_log_level(self, class_name, level): + """Helper method to set JVM log level for certain class name.""" + url = SET_JAVA_LOGLEVEL_URL.format( + self.webserver_interface, self.get_webserver_port()) + return post_data(url, {"class": class_name, "level": level}) + # Represents a statestored process class StateStoreProcess(BaseImpalaProcess): @@ -635,6 +659,12 @@ class CatalogdProcess(BaseImpalaProcess): self.service.wait_for_metric_value('statestore-subscriber.connected', expected_value=1, timeout=30) + def set_jvm_log_level(self, class_name, level): + """Helper method to set JVM log level for certain class name.""" + url = SET_JAVA_LOGLEVEL_URL.format( + self.webserver_interface, self.get_webserver_port()) + return post_data(url, {"class": class_name, "level": level}) + # Represents an admission control process. class AdmissiondProcess(BaseImpalaProcess): @@ -646,6 +676,7 @@ class AdmissiondProcess(BaseImpalaProcess): def _get_default_webserver_port(self): return DEFAULT_ADMISSIOND_WEBSERVER_PORT + def find_user_processes(binaries): """Returns an iterator over all processes owned by the current user with a matching binary name from the provided list. Return a iterable of tuples, with each tuple diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index eafdcda1f..eda2e6163 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -393,12 +393,14 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): verify_skipping_older_events(test_old_table, True, True) @CustomClusterTestSuite.with_args( - catalogd_args="--hms_event_polling_interval_s=5" - " --enable_sync_to_latest_event_on_ddls=true") + catalogd_args="--enable_sync_to_latest_event_on_ddls=true " + "--debug_actions=catalogd_get_filtered_events_delay:SLEEP@3000 ") def test_skipping_batching_events(self, unique_database): """Test to verify IMPALA-10949, improving batching logic for partition events. Before batching the events, each event is checked if the event id is greater than table's lastSyncEventId then the event can be batched else it can be skipped.""" + # Print trace logs from DebugUtils. + self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", "trace") test_batch_table = "test_batch_table" self.client.execute( "create table {}.{} like functional.alltypes"
