This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5925b194b IMPALA-11808: Add support for reload event in catalogD
5925b194b is described below

commit 5925b194b48dbe78ea1871d8aebc8f66aa72d01f
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Mon Dec 19 09:15:05 2022 -0800

    IMPALA-11808: Add support for reload event in catalogD
    
    This patch supports a new event called reload event in catalogD.
    This event is used to update table/file metadata for a refresh
    or invalidate command in other replicas of catalogDs when one
    of the replicas issued refresh/invalidate command. HIVE-26838 is
    the Hive jira that adds support for this reload event. This
    feature is disabled by default using a config
    enable_reload_events. To use this feature set this config to
    true and impala will be able to fire reload events. The
    processing of reload events in the event processor is always
    enabled. There is an end-to-end test added for this feature
    which currently checks firing/creation of the reload event and
    self-event check in the event processor. TODO: end-to-end test
    should also test this reload event in the event processor.
    There is also a follow up jira IMPALA-11822 to track the
    optimization patch for this feature.
    
    Change-Id: Ic62d58837d356dc2113f3c0904228ac9de484136
    Reviewed-on: http://gerrit.cloudera.org:8080/19378
    Tested-by: Aman Sinha <[email protected]>
    Reviewed-by: Andrew Sherman <[email protected]>
    Reviewed-by: Aman Sinha <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |   8 ++
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/compat/MetastoreShim.java    |  18 +++
 .../org/apache/impala/compat/MetastoreShim.java    |  65 ++++++++++
 .../impala/catalog/events/MetastoreEvents.java     | 140 +++++++++++++++++++++
 .../org/apache/impala/service/BackendConfig.java   |   9 ++
 .../apache/impala/service/CatalogOpExecutor.java   |  56 ++++++++-
 tests/custom_cluster/test_events_custom_configs.py |  39 ++++++
 9 files changed, 337 insertions(+), 2 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 0c5169bb6..cca214bd8 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -124,6 +124,14 @@ DEFINE_bool(enable_sync_to_latest_event_on_ddls, false, 
"This configuration is "
     "(if enabled). If this config is enabled, then the flag 
invalidate_hms_cache_on_ddls "
     "should be disabled");
 
+DEFINE_bool(enable_reload_events, false, "This configuration is used to fire a 
"
+    "refresh/invalidate table event to the HMS such that other event 
processors "
+    "(such as other Impala catalogds) that poll HMS notification logs can 
process "
+    "this event. The default value is false, so impala will not fire this "
+    "event. If enabled, impala will fire this event and other catalogD will 
process it."
+    "This config only affects the firing of the reload event. Processing of 
reload "
+    "event will always happen");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 9a8d17f1d..a2bf8bae7 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -101,6 +101,7 @@ 
DECLARE_bool(hms_event_incremental_refresh_transactional_table);
 DECLARE_bool(auto_check_compaction);
 DECLARE_bool(enable_sync_to_latest_event_on_ddls);
 DECLARE_bool(pull_table_types_and_comments);
+DECLARE_bool(enable_reload_events);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -335,6 +336,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_use_hms_column_order_for_hbase_tables(
       FLAGS_use_hms_column_order_for_hbase_tables);
   cfg.__set_ignored_dir_prefix_list(FLAGS_ignored_dir_prefix_list);
+  cfg.__set_enable_reload_events(FLAGS_enable_reload_events);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index a3317be89..bc09752f7 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -229,4 +229,6 @@ struct TBackendGflags {
   102: required bool use_hms_column_order_for_hbase_tables
 
   103: required string ignored_dir_prefix_list
+
+  104: required bool enable_reload_events
 }
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 6d5eb9370..e5235d54f 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -425,6 +425,24 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     return Collections.EMPTY_LIST;
   }
 
+  /**
+   *   CDP Hive-3 only function.
+   */
+  @VisibleForTesting
+  public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
+      boolean isRefresh, List<String> partVals, String dbName, String 
tableName,
+      Map<String, String> selfEventParams) throws TException {
+    throw new UnsupportedOperationException("Reload event is not supported.");
+  }
+
+  /**
+   *   CDP Hive-3 only function.
+   */
+  public static Map<String, Object> getFieldsFromReloadEvent(NotificationEvent 
event)
+      throws MetastoreNotificationException {
+    throw new UnsupportedOperationException("Reload event is not supported.");
+  }
+
   /**
    * Use thrift API directly instead of 
HiveMetastoreClient#getNextNotification because
    * the HMS client can throw an IllegalStateException when there is a gap 
between the
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 10985ee4a..bbda843d0 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -82,6 +82,7 @@ import 
org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.ReloadMessage;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.metadata.formatting.TextMetaDataTable;
 import org.apache.impala.analysis.TableName;
@@ -500,6 +501,70 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     return response.getEventIds();
   }
 
+  /**
+   *  Fires a reload event to HMS notification log. In Hive-3 the relaod event
+   *  in HMS corresponds to refresh table or invalidate metadata of table in 
impala.
+   *
+   * @param msClient Metastore client,
+   * @param isRefresh if this flag is set to true then it is a refresh query, 
else it
+   *                  is an invalidate metadata query.
+   * @param partVals The partition list corresponding to
+   *                                 the table, used by Apache Hive 3
+   * @param dbName
+   * @param tableName
+   * @return a list of eventIds for the reload events
+   */
+  @VisibleForTesting
+  public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
+      boolean isRefresh, List<String> partVals, String dbName, String 
tableName,
+      Map<String, String> selfEventParams) throws TException {
+    Preconditions.checkNotNull(msClient);
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(tableName);
+    FireEventRequestData data = new FireEventRequestData();
+    data.setRefreshEvent(isRefresh);
+    FireEventRequest rqst = new FireEventRequest(true, data);
+    rqst.setDbName(dbName);
+    rqst.setTableName(tableName);
+    rqst.setPartitionVals(partVals);
+    rqst.setTblParams(selfEventParams);
+    FireEventResponse response = 
msClient.getHiveClient().fireListenerEvent(rqst);
+    if (!response.isSetEventIds()) {
+      LOG.error("FireEventResponse does not have event ids set for table 
{}.{}. This "
+              + "may cause the table to unnecessarily be refreshed when the " +
+              "refresh/invalidate event is received.", dbName, tableName);
+      return Collections.EMPTY_LIST;
+    }
+    return response.getEventIds();
+  }
+
+  /**
+   *  This method extracts the table, partition, and isRefresh fields from the
+   *  notification event and returns them in a map.
+   *
+   * @param event Metastore notification event,
+   * @return a Map of fields required for the reload event.
+   */
+  public static Map<String, Object> getFieldsFromReloadEvent(NotificationEvent 
event)
+      throws MetastoreNotificationException{
+    ReloadMessage reloadMessage =
+        MetastoreEventsProcessor.getMessageDeserializer()
+            .getReloadMessage(event.getMessage());
+    Map<String, Object> updatedFields = new HashMap<>();
+    try {
+      org.apache.hadoop.hive.metastore.api.Table msTbl = 
Preconditions.checkNotNull(
+          reloadMessage.getTableObj());
+      Partition reloadPartition = reloadMessage.getPtnObj();
+      boolean isRefresh = reloadMessage.isRefreshEvent();
+      updatedFields.put("table", msTbl);
+      updatedFields.put("partition", reloadPartition);
+      updatedFields.put("isRefresh", isRefresh);
+    } catch (Exception e) {
+      throw new MetastoreNotificationException(e);
+    }
+    return updatedFields;
+  }
+
   /**
    * Wrapper around IMetaStoreClient.getThriftClient().get_next_notification() 
to deal
    * with added arguments.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 44d09150f..a8eafd569 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -121,6 +121,7 @@ public class MetastoreEvents {
     DROP_PARTITION("DROP_PARTITION"),
     INSERT("INSERT"),
     INSERT_PARTITIONS("INSERT_PARTITIONS"),
+    RELOAD("RELOAD"),
     ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
     COMMIT_TXN("COMMIT_TXN"),
     ABORT_TXN("ABORT_TXN"),
@@ -213,6 +214,8 @@ public class MetastoreEvents {
           return new DropPartitionEvent(catalogOpExecutor_, metrics, event);
         case ALTER_PARTITION:
           return new AlterPartitionEvent(catalogOpExecutor_, metrics, event);
+        case RELOAD:
+          return new ReloadEvent(catalogOpExecutor_, metrics, event);
         case INSERT:
           return new InsertEvent(catalogOpExecutor_, metrics, event);
         default:
@@ -2420,6 +2423,143 @@ public class MetastoreEvents {
     }
   }
 
+  /**
+   *  Metastore event handler for Reload events. A reload event can be 
generated by
+   *  refresh table/partition or invalidate table event. Handles reload events 
at both
+   *  table and partition scopes (If applicable).
+   */
+  public static class ReloadEvent extends MetastoreTableEvent {
+
+    // The partition for this reload event. Null if the table is unpartitioned
+    private Partition reloadPartition_;
+
+    // if isRefresh_ is set to true then it is refresh query, else it is 
invalidate query
+    private boolean isRefresh_;
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    @VisibleForTesting
+    ReloadEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
+        NotificationEvent event) throws MetastoreNotificationException {
+      super(catalogOpExecutor, metrics, event);
+      
Preconditions.checkArgument(MetastoreEventType.RELOAD.equals(getEventType()));
+      try {
+        Map<String, Object> updatedFields =
+            MetastoreShim.getFieldsFromReloadEvent(event);
+        msTbl_ = 
(org.apache.hadoop.hive.metastore.api.Table)Preconditions.checkNotNull(
+            updatedFields.get("table"));
+        reloadPartition_ = (Partition)updatedFields.get("partition");
+        isRefresh_ = (boolean)updatedFields.get("isRefresh");
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString("Unable to "
+                + "parse reload message"), e);
+      }
+    }
+
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      if (reloadPartition_ != null) {
+        // create selfEventContext for reload partition event
+        List<TPartitionKeyValue> tPartSpec =
+            getTPartitionSpecFromHmsPartition(msTbl_, reloadPartition_);
+        return new SelfEventContext(dbName_, tblName_, 
Arrays.asList(tPartSpec),
+            reloadPartition_.getParameters(), null);
+      } else {
+        // create selfEventContext for reload table event
+        return new SelfEventContext(
+            dbName_, tblName_, null, msTbl_.getParameters());
+      }
+    }
+
+    @Override
+    public void process() throws MetastoreNotificationException {
+      if (isSelfEvent()) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .inc(getNumberOfEvents());
+        infoLog("Incremented events skipped counter to {}",
+            metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                .getCount());
+        return;
+      }
+
+      if (isRefresh_) {
+        if (reloadPartition_ != null) {
+          processPartitionReload();
+        } else {
+          processTableReload();
+        }
+      } else {
+        processTableInvalidate();
+      }
+    }
+
+    /**
+     * Process partition reload
+     */
+    private void processPartitionReload() throws 
MetastoreNotificationException {
+      // For partitioned table, refresh the partition only.
+      Preconditions.checkNotNull(reloadPartition_);
+      try {
+        // Ignore event if table or database is not in catalog. Throw 
exception if
+        // refresh fails. If the partition does not exist in metastore the 
reload
+        // method below removes it from the catalog
+        // forcing file metadata reload so that new files (due to refresh) are 
reflected
+        // HdfsPartition
+        reloadPartitions(Arrays.asList(reloadPartition_),
+            FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event");
+      } catch (CatalogException e) {
+        throw new 
MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+            + "partition on table {} partition {} failed. Event processing 
cannot "
+            + "continue. Issue an invalidate metadata command to reset the 
event "
+            + "processor state.", getFullyQualifiedTblName(),
+            Joiner.on(',').join(reloadPartition_.getValues())), e);
+      }
+    }
+
+    /**
+     *  Process unpartitioned table reload
+     */
+    private void processTableReload() throws MetastoreNotificationException {
+      // For non-partitioned tables, refresh the whole table.
+      Preconditions.checkState(reloadPartition_ == null);
+      try {
+        // we always treat the table as non-transactional so all the files are 
reloaded
+        reloadTableFromCatalog("RELOAD event", false);
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(
+            debugString("Refresh table {} failed. Event processing "
+            + "cannot continue. Issue an invalidate metadata " +
+            "command to reset the event processor state.",
+            getFullyQualifiedTblName()), e);
+      }
+    }
+
+    private void processTableInvalidate() throws 
MetastoreNotificationException {
+      Reference<Boolean> tblWasRemoved = new Reference<>();
+      Reference<Boolean> dbWasAdded = new Reference<>();
+      org.apache.impala.catalog.Table tbl = null;
+      try {
+        tbl = catalog_.getTable(dbName_, tblName_);
+        if (tbl == null) {
+          infoLog("Skipping on table {}.{} since it does not exist in cache", 
dbName_,
+              tblName_);
+          return ;
+        }
+        if (tbl instanceof IncompleteTable) {
+          infoLog("Skipping on an incomplete table {}", tbl.getFullName());
+          return ;
+        }
+      } catch (DatabaseNotFoundException e) {
+        infoLog("Skipping on table {} because db {} not found in cache", 
tblName_,
+            dbName_);
+        return ;
+      }
+      catalog_.invalidateTable(tbl.getTableName().toThrift(),
+          tblWasRemoved, dbWasAdded);
+      LOG.info("Table " + tbl.getFullName() + " is invalidated from catalog 
cache");
+    }
+  }
+
   /**
    * Metastore event handler for ABORT_TXN events. Handles abort event for 
transactional
    * tables.
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 0261ec08e..2346dfa49 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -356,6 +356,15 @@ public class BackendConfig {
     backendCfg_.enable_sync_to_latest_event_on_ddls = flag;
   }
 
+  public boolean enableReloadEvents() {
+    return backendCfg_.enable_reload_events;
+  }
+
+  @VisibleForTesting
+  public void setEnableReloadEvents(boolean flag) {
+    backendCfg_.enable_reload_events = flag;
+  }
+
   public boolean pullTableTypesAndComments() {
     return backendCfg_.pull_table_types_and_comments;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6f3fa24e9..fb1b82afc 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -6284,11 +6285,11 @@ public class CatalogOpExecutor {
       Reference<Boolean> dbWasAdded = new Reference<Boolean>(false);
       // Thrift representation of the result of the invalidate/refresh 
operation.
       TCatalogObject updatedThriftTable = null;
+      TableName tblName = TableName.fromThrift(req.getTable_name());
+      Table tbl = catalog_.getTable(tblName.getDb(), tblName.getTbl());
       if (req.isIs_refresh()) {
-        TableName tblName = TableName.fromThrift(req.getTable_name());
         // Quick check to see if the table exists in the catalog without 
triggering
         // a table load.
-        Table tbl = catalog_.getTable(tblName.getDb(), tblName.getTbl());
         if (tbl != null) {
           // If the table is not loaded, no need to perform refresh after the 
initial
           // metadata load.
@@ -6347,6 +6348,11 @@ public class CatalogOpExecutor {
             req.getTable_name().getTable_name());
       }
 
+      if (BackendConfig.INSTANCE.enableReloadEvents()) {
+        // fire event for refresh event
+        fireReloadEventHelper(req, updatedThriftTable, tblName, tbl);
+      }
+
       // Return the TCatalogObject in the result to indicate this request can 
be
       // processed as a direct DDL operation.
       if (tblWasRemoved.getRef()) {
@@ -6385,6 +6391,52 @@ public class CatalogOpExecutor {
     return resp;
   }
 
+  /**
+   * Helper class for refresh event.
+   * This class invokes metastore shim's fireReloadEvent to fire event to HMS
+   * @param req - request object for TResetMetadataRequest.
+   * @param updatedThriftTable - updated thrift table after refresh query
+   * @param tblName
+   * @param tbl
+   */
+  private void fireReloadEventHelper(TResetMetadataRequest req,
+      TCatalogObject updatedThriftTable, TableName tblName, Table tbl) {
+    List<String> partVals = null;
+    if (req.isSetPartition_spec()) {
+      partVals = req.getPartition_spec().stream().
+          map(partSpec -> partSpec.getValue()).collect(Collectors.toList());
+    }
+    try {
+      // Get new catalog version for table refresh/invalidate.
+      long newCatalogVersion = updatedThriftTable.getCatalog_version();
+      Map<String, String> tableParams = new HashMap<>();
+      tableParams.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
+          catalog_.getCatalogServiceId());
+      tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
+          String.valueOf(newCatalogVersion));
+      MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(),
+          req.isIs_refresh(), partVals, tblName.getDb(), tblName.getTbl(),
+          tableParams);
+      if (req.isIs_refresh()) {
+        if (catalog_.tryLock(tbl, true, 600000)) {
+          catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
+        } else {
+          LOG.warn(String.format("Couldn't obtain a version lock for the 
table: %s. " +
+              "Self events may go undetected in that case",
+              tbl.getName()));
+        }
+      }
+    } catch (TException e) {
+      LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR,
+          "fireReloadEvent") + e.getMessage());
+    } finally {
+      if (tbl.isWriteLockedByCurrentThread()) {
+        tbl.releaseWriteLock();
+        catalog_.getLock().writeLock().unlock();
+      }
+    }
+  }
+
   /**
    * Create any new partitions required as a result of an INSERT statement and 
refreshes
    * the table metadata after every INSERT statement. Any new partitions will 
inherit
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 8fe6ae24a..e8a8f88fc 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -226,6 +226,45 @@ class 
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1"
+                                                  " 
--enable_reload_events=true")
+  def test_refresh_invalidate_events(self, unique_database):
+    """Test is to verify Impala-11808, refresh/invalidate commands should 
generate a
+    Reload event in HMS and CatalogD's event processor should process this 
event.
+    """
+    test_reload_table = "test_reload_table"
+    self.client.execute(
+      "create table {}.{} (i int) partitioned by (year int) "
+        .format(unique_database, test_reload_table))
+    self.client.execute(
+      "insert into {}.{} partition (year=2022) values (1),(2),(3)"
+        .format(unique_database, test_reload_table))
+    self.client.execute(
+      "insert into {}.{} partition (year=2023) values (1),(2),(3)"
+        .format(unique_database, test_reload_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    def check_self_events(query):
+      tbls_refreshed_before, partitions_refreshed_before, \
+          events_skipped_before = self.__get_self_event_metrics()
+      last_event_id = 
EventProcessorUtils.get_current_notification_id(self.hive_client)
+      self.client.execute(query)
+      # Check if there is a reload event fired after refresh query.
+      events = EventProcessorUtils.get_next_notification(self.hive_client, 
last_event_id)
+      assert len(events) == 1
+      last_event = events[0]
+      assert last_event.dbName == unique_database
+      assert last_event.tableName == test_reload_table
+      assert last_event.eventType == "RELOAD"
+      EventProcessorUtils.wait_for_event_processing(self)
+      tbls_refreshed_after, partitions_refreshed_after, \
+          events_skipped_after = self.__get_self_event_metrics()
+      assert events_skipped_after > events_skipped_before
+
+    check_self_events("refresh {}.{} partition(year=2022)"
+        .format(unique_database, test_reload_table))
+    check_self_events("refresh {}.{}".format(unique_database, 
test_reload_table))
+
   
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_event_batching(self, unique_database):
     """Runs queries which generate multiple ALTER_PARTITION events which must 
be

Reply via email to