This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 5925b194b IMPALA-11808: Add support for reload event in catalogD
5925b194b is described below
commit 5925b194b48dbe78ea1871d8aebc8f66aa72d01f
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Mon Dec 19 09:15:05 2022 -0800
IMPALA-11808: Add support for reload event in catalogD
This patch supports a new event called reload event in catalogD.
This event is used to update table/file metadata for a refresh
or invalidate command in other replicas of catalogDs when one
of the replicas issued refresh/invalidate command. HIVE-26838 is
the Hive jira that adds support for this reload event. This
feature is disabled by default using a config
enable_reload_events. To use this feature set this config to
true and impala will be able to fire reload events. The
processing of reload events in the event processor is always
enabled. There is an end-to-end test added for this feature
which currently checks firing/creation of the reload event and
self-event check in the event processor. TODO: end-to-end test
should also test this reload event in the event processor.
There is also a follow up jira IMPALA-11822 to track the
optimization patch for this feature.
Change-Id: Ic62d58837d356dc2113f3c0904228ac9de484136
Reviewed-on: http://gerrit.cloudera.org:8080/19378
Tested-by: Aman Sinha <[email protected]>
Reviewed-by: Andrew Sherman <[email protected]>
Reviewed-by: Aman Sinha <[email protected]>
---
be/src/catalog/catalog-server.cc | 8 ++
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/compat/MetastoreShim.java | 18 +++
.../org/apache/impala/compat/MetastoreShim.java | 65 ++++++++++
.../impala/catalog/events/MetastoreEvents.java | 140 +++++++++++++++++++++
.../org/apache/impala/service/BackendConfig.java | 9 ++
.../apache/impala/service/CatalogOpExecutor.java | 56 ++++++++-
tests/custom_cluster/test_events_custom_configs.py | 39 ++++++
9 files changed, 337 insertions(+), 2 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 0c5169bb6..cca214bd8 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -124,6 +124,14 @@ DEFINE_bool(enable_sync_to_latest_event_on_ddls, false,
"This configuration is "
"(if enabled). If this config is enabled, then the flag
invalidate_hms_cache_on_ddls "
"should be disabled");
+DEFINE_bool(enable_reload_events, false, "This configuration is used to fire a
"
+ "refresh/invalidate table event to the HMS such that other event
processors "
+ "(such as other Impala catalogds) that poll HMS notification logs can
process "
+ "this event. The default value is false, so impala will not fire this "
+ "event. If enabled, impala will fire this event and other catalogD will
process it."
+ "This config only affects the firing of the reload event. Processing of
reload "
+ "event will always happen");
+
DECLARE_string(state_store_host);
DECLARE_int32(state_store_subscriber_port);
DECLARE_int32(state_store_port);
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index 9a8d17f1d..a2bf8bae7 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -101,6 +101,7 @@
DECLARE_bool(hms_event_incremental_refresh_transactional_table);
DECLARE_bool(auto_check_compaction);
DECLARE_bool(enable_sync_to_latest_event_on_ddls);
DECLARE_bool(pull_table_types_and_comments);
+DECLARE_bool(enable_reload_events);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -335,6 +336,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_use_hms_column_order_for_hbase_tables(
FLAGS_use_hms_column_order_for_hbase_tables);
cfg.__set_ignored_dir_prefix_list(FLAGS_ignored_dir_prefix_list);
+ cfg.__set_enable_reload_events(FLAGS_enable_reload_events);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index a3317be89..bc09752f7 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -229,4 +229,6 @@ struct TBackendGflags {
102: required bool use_hms_column_order_for_hbase_tables
103: required string ignored_dir_prefix_list
+
+ 104: required bool enable_reload_events
}
diff --git
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 6d5eb9370..e5235d54f 100644
---
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -425,6 +425,24 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
return Collections.EMPTY_LIST;
}
+ /**
+ * CDP Hive-3 only function.
+ */
+ @VisibleForTesting
+ public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
+ boolean isRefresh, List<String> partVals, String dbName, String
tableName,
+ Map<String, String> selfEventParams) throws TException {
+ throw new UnsupportedOperationException("Reload event is not supported.");
+ }
+
+ /**
+ * CDP Hive-3 only function.
+ */
+ public static Map<String, Object> getFieldsFromReloadEvent(NotificationEvent
event)
+ throws MetastoreNotificationException {
+ throw new UnsupportedOperationException("Reload event is not supported.");
+ }
+
/**
* Use thrift API directly instead of
HiveMetastoreClient#getNextNotification because
* the HMS client can throw an IllegalStateException when there is a gap
between the
diff --git
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 10985ee4a..bbda843d0 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -82,6 +82,7 @@ import
org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.ReloadMessage;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.formatting.TextMetaDataTable;
import org.apache.impala.analysis.TableName;
@@ -500,6 +501,70 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
return response.getEventIds();
}
+ /**
+ * Fires a reload event to HMS notification log. In Hive-3 the relaod event
+ * in HMS corresponds to refresh table or invalidate metadata of table in
impala.
+ *
+ * @param msClient Metastore client,
+ * @param isRefresh if this flag is set to true then it is a refresh query,
else it
+ * is an invalidate metadata query.
+ * @param partVals The partition list corresponding to
+ * the table, used by Apache Hive 3
+ * @param dbName
+ * @param tableName
+ * @return a list of eventIds for the reload events
+ */
+ @VisibleForTesting
+ public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
+ boolean isRefresh, List<String> partVals, String dbName, String
tableName,
+ Map<String, String> selfEventParams) throws TException {
+ Preconditions.checkNotNull(msClient);
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(tableName);
+ FireEventRequestData data = new FireEventRequestData();
+ data.setRefreshEvent(isRefresh);
+ FireEventRequest rqst = new FireEventRequest(true, data);
+ rqst.setDbName(dbName);
+ rqst.setTableName(tableName);
+ rqst.setPartitionVals(partVals);
+ rqst.setTblParams(selfEventParams);
+ FireEventResponse response =
msClient.getHiveClient().fireListenerEvent(rqst);
+ if (!response.isSetEventIds()) {
+ LOG.error("FireEventResponse does not have event ids set for table
{}.{}. This "
+ + "may cause the table to unnecessarily be refreshed when the " +
+ "refresh/invalidate event is received.", dbName, tableName);
+ return Collections.EMPTY_LIST;
+ }
+ return response.getEventIds();
+ }
+
+ /**
+ * This method extracts the table, partition, and isRefresh fields from the
+ * notification event and returns them in a map.
+ *
+ * @param event Metastore notification event,
+ * @return a Map of fields required for the reload event.
+ */
+ public static Map<String, Object> getFieldsFromReloadEvent(NotificationEvent
event)
+ throws MetastoreNotificationException{
+ ReloadMessage reloadMessage =
+ MetastoreEventsProcessor.getMessageDeserializer()
+ .getReloadMessage(event.getMessage());
+ Map<String, Object> updatedFields = new HashMap<>();
+ try {
+ org.apache.hadoop.hive.metastore.api.Table msTbl =
Preconditions.checkNotNull(
+ reloadMessage.getTableObj());
+ Partition reloadPartition = reloadMessage.getPtnObj();
+ boolean isRefresh = reloadMessage.isRefreshEvent();
+ updatedFields.put("table", msTbl);
+ updatedFields.put("partition", reloadPartition);
+ updatedFields.put("isRefresh", isRefresh);
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(e);
+ }
+ return updatedFields;
+ }
+
/**
* Wrapper around IMetaStoreClient.getThriftClient().get_next_notification()
to deal
* with added arguments.
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 44d09150f..a8eafd569 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -121,6 +121,7 @@ public class MetastoreEvents {
DROP_PARTITION("DROP_PARTITION"),
INSERT("INSERT"),
INSERT_PARTITIONS("INSERT_PARTITIONS"),
+ RELOAD("RELOAD"),
ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
COMMIT_TXN("COMMIT_TXN"),
ABORT_TXN("ABORT_TXN"),
@@ -213,6 +214,8 @@ public class MetastoreEvents {
return new DropPartitionEvent(catalogOpExecutor_, metrics, event);
case ALTER_PARTITION:
return new AlterPartitionEvent(catalogOpExecutor_, metrics, event);
+ case RELOAD:
+ return new ReloadEvent(catalogOpExecutor_, metrics, event);
case INSERT:
return new InsertEvent(catalogOpExecutor_, metrics, event);
default:
@@ -2420,6 +2423,143 @@ public class MetastoreEvents {
}
}
+ /**
+ * Metastore event handler for Reload events. A reload event can be
generated by
+ * refresh table/partition or invalidate table event. Handles reload events
at both
+ * table and partition scopes (If applicable).
+ */
+ public static class ReloadEvent extends MetastoreTableEvent {
+
+ // The partition for this reload event. Null if the table is unpartitioned
+ private Partition reloadPartition_;
+
+ // if isRefresh_ is set to true then it is refresh query, else it is
invalidate query
+ private boolean isRefresh_;
+ /**
+ * Prevent instantiation from outside should use MetastoreEventFactory
instead
+ */
+ @VisibleForTesting
+ ReloadEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
+ NotificationEvent event) throws MetastoreNotificationException {
+ super(catalogOpExecutor, metrics, event);
+
Preconditions.checkArgument(MetastoreEventType.RELOAD.equals(getEventType()));
+ try {
+ Map<String, Object> updatedFields =
+ MetastoreShim.getFieldsFromReloadEvent(event);
+ msTbl_ =
(org.apache.hadoop.hive.metastore.api.Table)Preconditions.checkNotNull(
+ updatedFields.get("table"));
+ reloadPartition_ = (Partition)updatedFields.get("partition");
+ isRefresh_ = (boolean)updatedFields.get("isRefresh");
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(debugString("Unable to "
+ + "parse reload message"), e);
+ }
+ }
+
+ @Override
+ public SelfEventContext getSelfEventContext() {
+ if (reloadPartition_ != null) {
+ // create selfEventContext for reload partition event
+ List<TPartitionKeyValue> tPartSpec =
+ getTPartitionSpecFromHmsPartition(msTbl_, reloadPartition_);
+ return new SelfEventContext(dbName_, tblName_,
Arrays.asList(tPartSpec),
+ reloadPartition_.getParameters(), null);
+ } else {
+ // create selfEventContext for reload table event
+ return new SelfEventContext(
+ dbName_, tblName_, null, msTbl_.getParameters());
+ }
+ }
+
+ @Override
+ public void process() throws MetastoreNotificationException {
+ if (isSelfEvent()) {
+ metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+ .inc(getNumberOfEvents());
+ infoLog("Incremented events skipped counter to {}",
+ metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+ .getCount());
+ return;
+ }
+
+ if (isRefresh_) {
+ if (reloadPartition_ != null) {
+ processPartitionReload();
+ } else {
+ processTableReload();
+ }
+ } else {
+ processTableInvalidate();
+ }
+ }
+
+ /**
+ * Process partition reload
+ */
+ private void processPartitionReload() throws
MetastoreNotificationException {
+ // For partitioned table, refresh the partition only.
+ Preconditions.checkNotNull(reloadPartition_);
+ try {
+ // Ignore event if table or database is not in catalog. Throw
exception if
+ // refresh fails. If the partition does not exist in metastore the
reload
+ // method below removes it from the catalog
+ // forcing file metadata reload so that new files (due to refresh) are
reflected
+ // HdfsPartition
+ reloadPartitions(Arrays.asList(reloadPartition_),
+ FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event");
+ } catch (CatalogException e) {
+ throw new
MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+ + "partition on table {} partition {} failed. Event processing
cannot "
+ + "continue. Issue an invalidate metadata command to reset the
event "
+ + "processor state.", getFullyQualifiedTblName(),
+ Joiner.on(',').join(reloadPartition_.getValues())), e);
+ }
+ }
+
+ /**
+ * Process unpartitioned table reload
+ */
+ private void processTableReload() throws MetastoreNotificationException {
+ // For non-partitioned tables, refresh the whole table.
+ Preconditions.checkState(reloadPartition_ == null);
+ try {
+ // we always treat the table as non-transactional so all the files are
reloaded
+ reloadTableFromCatalog("RELOAD event", false);
+ } catch (CatalogException e) {
+ throw new MetastoreNotificationNeedsInvalidateException(
+ debugString("Refresh table {} failed. Event processing "
+ + "cannot continue. Issue an invalidate metadata " +
+ "command to reset the event processor state.",
+ getFullyQualifiedTblName()), e);
+ }
+ }
+
+ private void processTableInvalidate() throws
MetastoreNotificationException {
+ Reference<Boolean> tblWasRemoved = new Reference<>();
+ Reference<Boolean> dbWasAdded = new Reference<>();
+ org.apache.impala.catalog.Table tbl = null;
+ try {
+ tbl = catalog_.getTable(dbName_, tblName_);
+ if (tbl == null) {
+ infoLog("Skipping on table {}.{} since it does not exist in cache",
dbName_,
+ tblName_);
+ return ;
+ }
+ if (tbl instanceof IncompleteTable) {
+ infoLog("Skipping on an incomplete table {}", tbl.getFullName());
+ return ;
+ }
+ } catch (DatabaseNotFoundException e) {
+ infoLog("Skipping on table {} because db {} not found in cache",
tblName_,
+ dbName_);
+ return ;
+ }
+ catalog_.invalidateTable(tbl.getTableName().toThrift(),
+ tblWasRemoved, dbWasAdded);
+ LOG.info("Table " + tbl.getFullName() + " is invalidated from catalog
cache");
+ }
+ }
+
/**
* Metastore event handler for ABORT_TXN events. Handles abort event for
transactional
* tables.
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 0261ec08e..2346dfa49 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -356,6 +356,15 @@ public class BackendConfig {
backendCfg_.enable_sync_to_latest_event_on_ddls = flag;
}
+ public boolean enableReloadEvents() {
+ return backendCfg_.enable_reload_events;
+ }
+
+ @VisibleForTesting
+ public void setEnableReloadEvents(boolean flag) {
+ backendCfg_.enable_reload_events = flag;
+ }
+
public boolean pullTableTypesAndComments() {
return backendCfg_.pull_table_types_and_comments;
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6f3fa24e9..fb1b82afc 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -44,6 +44,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -6284,11 +6285,11 @@ public class CatalogOpExecutor {
Reference<Boolean> dbWasAdded = new Reference<Boolean>(false);
// Thrift representation of the result of the invalidate/refresh
operation.
TCatalogObject updatedThriftTable = null;
+ TableName tblName = TableName.fromThrift(req.getTable_name());
+ Table tbl = catalog_.getTable(tblName.getDb(), tblName.getTbl());
if (req.isIs_refresh()) {
- TableName tblName = TableName.fromThrift(req.getTable_name());
// Quick check to see if the table exists in the catalog without
triggering
// a table load.
- Table tbl = catalog_.getTable(tblName.getDb(), tblName.getTbl());
if (tbl != null) {
// If the table is not loaded, no need to perform refresh after the
initial
// metadata load.
@@ -6347,6 +6348,11 @@ public class CatalogOpExecutor {
req.getTable_name().getTable_name());
}
+ if (BackendConfig.INSTANCE.enableReloadEvents()) {
+ // fire event for refresh event
+ fireReloadEventHelper(req, updatedThriftTable, tblName, tbl);
+ }
+
// Return the TCatalogObject in the result to indicate this request can
be
// processed as a direct DDL operation.
if (tblWasRemoved.getRef()) {
@@ -6385,6 +6391,52 @@ public class CatalogOpExecutor {
return resp;
}
+ /**
+ * Helper class for refresh event.
+ * This class invokes metastore shim's fireReloadEvent to fire event to HMS
+ * @param req - request object for TResetMetadataRequest.
+ * @param updatedThriftTable - updated thrift table after refresh query
+ * @param tblName
+ * @param tbl
+ */
+ private void fireReloadEventHelper(TResetMetadataRequest req,
+ TCatalogObject updatedThriftTable, TableName tblName, Table tbl) {
+ List<String> partVals = null;
+ if (req.isSetPartition_spec()) {
+ partVals = req.getPartition_spec().stream().
+ map(partSpec -> partSpec.getValue()).collect(Collectors.toList());
+ }
+ try {
+ // Get new catalog version for table refresh/invalidate.
+ long newCatalogVersion = updatedThriftTable.getCatalog_version();
+ Map<String, String> tableParams = new HashMap<>();
+ tableParams.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
+ catalog_.getCatalogServiceId());
+ tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
+ String.valueOf(newCatalogVersion));
+ MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(),
+ req.isIs_refresh(), partVals, tblName.getDb(), tblName.getTbl(),
+ tableParams);
+ if (req.isIs_refresh()) {
+ if (catalog_.tryLock(tbl, true, 600000)) {
+ catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
+ } else {
+ LOG.warn(String.format("Couldn't obtain a version lock for the
table: %s. " +
+ "Self events may go undetected in that case",
+ tbl.getName()));
+ }
+ }
+ } catch (TException e) {
+ LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR,
+ "fireReloadEvent") + e.getMessage());
+ } finally {
+ if (tbl.isWriteLockedByCurrentThread()) {
+ tbl.releaseWriteLock();
+ catalog_.getLock().writeLock().unlock();
+ }
+ }
+ }
+
/**
* Create any new partitions required as a result of an INSERT statement and
refreshes
* the table metadata after every INSERT statement. Any new partitions will
inherit
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index 8fe6ae24a..e8a8f88fc 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -226,6 +226,45 @@ class
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
self.__run_self_events_test(unique_database, True)
self.__run_self_events_test(unique_database, False)
+
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1"
+ "
--enable_reload_events=true")
+ def test_refresh_invalidate_events(self, unique_database):
+ """Test is to verify Impala-11808, refresh/invalidate commands should
generate a
+ Reload event in HMS and CatalogD's event processor should process this
event.
+ """
+ test_reload_table = "test_reload_table"
+ self.client.execute(
+ "create table {}.{} (i int) partitioned by (year int) "
+ .format(unique_database, test_reload_table))
+ self.client.execute(
+ "insert into {}.{} partition (year=2022) values (1),(2),(3)"
+ .format(unique_database, test_reload_table))
+ self.client.execute(
+ "insert into {}.{} partition (year=2023) values (1),(2),(3)"
+ .format(unique_database, test_reload_table))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ def check_self_events(query):
+ tbls_refreshed_before, partitions_refreshed_before, \
+ events_skipped_before = self.__get_self_event_metrics()
+ last_event_id =
EventProcessorUtils.get_current_notification_id(self.hive_client)
+ self.client.execute(query)
+ # Check if there is a reload event fired after refresh query.
+ events = EventProcessorUtils.get_next_notification(self.hive_client,
last_event_id)
+ assert len(events) == 1
+ last_event = events[0]
+ assert last_event.dbName == unique_database
+ assert last_event.tableName == test_reload_table
+ assert last_event.eventType == "RELOAD"
+ EventProcessorUtils.wait_for_event_processing(self)
+ tbls_refreshed_after, partitions_refreshed_after, \
+ events_skipped_after = self.__get_self_event_metrics()
+ assert events_skipped_after > events_skipped_before
+
+ check_self_events("refresh {}.{} partition(year=2022)"
+ .format(unique_database, test_reload_table))
+ check_self_events("refresh {}.{}".format(unique_database,
test_reload_table))
+
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
def test_event_batching(self, unique_database):
"""Runs queries which generate multiple ALTER_PARTITION events which must
be