This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 547f96ec04ce2736fbf24d2a7253f549fcbbd167
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue Aug 13 10:02:35 2024 -0700

    IMPALA-12865: Fix wrong lastRefreshEventId set by firing RELOAD events
    
    When enable_reload_events is true, catalogd fires RELOAD events after
    REFRESH finishes reloading the table/partition. The RELOAD event id is
    also used to update lastRefreshEventId of the table/partition. This is
    problematic when enable_skipping_older_events is true. HMS events
    generated after the reload and before the RELOAD event will be skipped.
    
    Solution:
    Fetch the current HMS notification event id before the table/partition
    is refreshed, and set it as lastRefreshEventId on the metadata object.
    
    Testing:
    - Manually verified the issue is addressed.
    - Added an end-to-end test which is close to real time issue.
    
    Change-Id: I90039da77ec561c5aede44456f88c6650582815b
    Reviewed-on: http://gerrit.cloudera.org:8080/21665
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/catalog/HdfsPartition.java   |  4 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  2 +
 .../impala/catalog/events/MetastoreEvents.java     | 19 +++++++--
 .../apache/impala/service/CatalogOpExecutor.java   | 39 +++++++++++++++---
 .../java/org/apache/impala/util/DebugUtils.java    |  6 +++
 tests/custom_cluster/test_events_custom_configs.py | 46 ++++++++++++++++++++++
 6 files changed, 106 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 6128e27f2..c50a94fbf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -971,7 +971,9 @@ public class HdfsPartition extends CatalogObjectImpl 
implements FeFsPartition {
     }
 
     public Builder setLastRefreshEventId(long eventId) {
-      lastRefreshEventId_ = eventId;
+      if (eventId > lastRefreshEventId_) {
+        lastRefreshEventId_ = eventId;
+      }
       return this;
     }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 9c0e0d03f..ba9e66ec2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1045,6 +1045,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     boolean partitionNotChanged = partBuilder.equalsToOriginal(oldPartition);
     LOG.trace("Partition {} {}", oldPartition.getName(),
         partitionNotChanged ? "changed" : "unchanged");
+    // for partitioned refresh, partition should be updated whether the 
partition is
+    // changed or not.
     if (partitionNotChanged) return false;
     HdfsPartition newPartition = partBuilder.build();
     // Partition is reloaded and hence cache directives are not dropped.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 23b78500b..2a938c251 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1534,6 +1534,8 @@ public class MetastoreEvents {
                   .getCount());
           return true;
         }
+        DebugUtils.executeDebugAction(BackendConfig.INSTANCE.debugActions(),
+            DebugUtils.IS_OLDER_EVENT_CHECK_DELAY);
         // Always check the lastRefreshEventId on the table first for table 
level refresh
         boolean canSkip = tbl.getLastRefreshEventId() >= getEventId();
         try {
@@ -3280,6 +3282,7 @@ public class MetastoreEvents {
       List<Long> eventIds = new ArrayList<>();
       // We treat insert event as a special case since the self-event context 
for an
       // insert event is generated differently using the eventIds.
+      boolean isReloadEvent = baseEvent_ instanceof ReloadEvent;
       boolean isInsertEvent = baseEvent_ instanceof InsertEvent;
       for (T event : batchedEvents_) {
         partitionKeyValues.add(
@@ -3288,7 +3291,8 @@ public class MetastoreEvents {
         eventIds.add(event.getEventId());
       }
       return new SelfEventContext(dbName_, tblName_, partitionKeyValues,
-          baseEvent_.getPartitionForBatching().getParameters(),
+          isReloadEvent ? msTbl_.getParameters() :
+              baseEvent_.getPartitionForBatching().getParameters(),
           isInsertEvent ? eventIds : null);
     }
   }
@@ -3483,12 +3487,21 @@ public class MetastoreEvents {
 
     @Override
     public SelfEventContext getSelfEventContext() {
-      throw new UnsupportedOperationException("Self-event evaluation is 
unnecessary for"
-          + " this event type");
+      if (reloadPartition_ == null) {
+        return new SelfEventContext(msTbl_.getDbName(), msTbl_.getTableName(),
+            msTbl_.getParameters());
+      }
+      return new SelfEventContext(msTbl_.getDbName(), msTbl_.getTableName(),
+          Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, 
reloadPartition_)),
+          msTbl_.getParameters());
     }
 
     @Override
     public void processTableEvent() throws MetastoreNotificationException {
+      if (isSelfEvent()) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
       if (isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 7c72531a7..6c4319136 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7452,7 +7452,7 @@ public class CatalogOpExecutor {
         }
         Preconditions.checkNotNull(tbl, "tbl is null in " + cmdString);
         // fire event for refresh event and update the last refresh event id
-        fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl);
+        fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl, eventId);
         catalogTimeline.markEvent("Fired reload events in Metastore");
       }
 
@@ -7515,7 +7515,7 @@ public class CatalogOpExecutor {
    * and update the last refresh event id in the cache
    */
   private void fireReloadEventAndUpdateRefreshEventId(
-      TResetMetadataRequest req, TableName tblName, Table tbl) {
+      TResetMetadataRequest req, TableName tblName, Table tbl, long 
currentHMSEventId) {
     // Partition spec (List<TPartitionKeyValue>) for each partition
     List<List<TPartitionKeyValue>> partSpecList = null;
     // Partition values (List<String>) for each partition
@@ -7532,10 +7532,18 @@ public class CatalogOpExecutor {
               .collect(Collectors.toList()))
           .collect(Collectors.toList());
     }
+    DebugUtils.executeDebugAction(
+        BackendConfig.INSTANCE.debugActions(), 
DebugUtils.FIRE_RELOAD_EVENT_DELAY);
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     try {
+      Map<String, String> selfEventProps = new HashMap<>();
+      selfEventProps.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
+          catalog_.getCatalogServiceId());
+      selfEventProps.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
+          String.valueOf(newCatalogVersion));
       List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
           catalog_.getMetaStoreClient(), req.isIs_refresh(), partValsList,
-          tblName.getDb(), tblName.getTbl(), Collections.emptyMap());
+          tblName.getDb(), tblName.getTbl(), selfEventProps);
       LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(),
           tbl.getFullName(), StringUtils.join(",", eventIds));
       // Update the lastRefreshEventId accordingly
@@ -7548,6 +7556,10 @@ public class CatalogOpExecutor {
       }
 
       // tbl lock is held at this point.
+      // It is possible that some operations might have modified the metadata 
externally
+      // while refresh operation is still in-progress, so it is safe to set 
the latest
+      // HMS notification event id before refresh operation, on the metadata 
object as
+      // lastRefreshEventId
       if (partSpecList != null) {
         Preconditions.checkNotNull(partValsList);
         boolean partitionChanged = false;
@@ -7555,11 +7567,20 @@ public class CatalogOpExecutor {
           HdfsTable hdfsTbl = (HdfsTable) tbl;
           HdfsPartition partition = hdfsTbl
               .getPartitionFromThriftPartitionSpec(partSpecList.get(i));
+          if (currentHMSEventId + 1 == eventIds.get(i)) {
+            currentHMSEventId = eventIds.get(i);
+          }
           if (partition != null) {
             HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
             // use last event id, so that batch partition events will not 
reloaded again
-            partBuilder.setLastRefreshEventId(eventIds.get(eventIds.size() - 
1));
-            partitionChanged |= hdfsTbl.updatePartition(partBuilder);
+            partBuilder.setLastRefreshEventId(currentHMSEventId);
+            if (hdfsTbl.updatePartition(partBuilder)) {
+              partitionChanged = true;
+              partition = hdfsTbl.getPartitionFromThriftPartitionSpec(
+                  partSpecList.get(i));
+              Preconditions.checkNotNull(partition, "Partition is null after 
update");
+            }
+            partition.addToVersionsForInflightEvents(false, newCatalogVersion);
           } else {
             LOG.warn("Partition {} no longer exists in table {}. It might be " 
+
                     "dropped by a concurrent operation.",
@@ -7572,8 +7593,14 @@ public class CatalogOpExecutor {
           tbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
         }
       } else {
-        tbl.setLastRefreshEventId(eventIds.get(0));
+        if (currentHMSEventId + 1 == eventIds.get(0)) {
+          currentHMSEventId = eventIds.get(0);
+        }
+        tbl.setLastRefreshEventId(currentHMSEventId);
+        // Add inflight event at table level
+        catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
       }
+
     } catch (TException | CatalogException e) {
       LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR,
           "fireReloadEvent") + e.getMessage());
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 5754263d9..b5d402dba 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -131,6 +131,12 @@ public class DebugUtils {
   public static final String RESET_METADATA_LOOP_UNLOCKED =
       "reset_metadata_loop_unlocked";
 
+  // debug action label to inject a delay when firing reload events
+  public static final String FIRE_RELOAD_EVENT_DELAY = 
"fire_reload_event_delay";
+
+  // debug action label to inject a delay when checking for older event
+  public static final String IS_OLDER_EVENT_CHECK_DELAY = 
"older_event_check_delay";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index d8ced8838..1c2209209 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1298,6 +1298,52 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       unique_database, hive_tbl)))
     assert data == 0
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1"
+                  " --enable_reload_events=true"
+                  " --debug_actions=fire_reload_event_delay:SLEEP@3000|"
+                  "older_event_check_delay:SLEEP@3000"
+                  " --enable_skipping_older_events=true")
+  def test_verify_last_refresh_event_id(self, unique_database):
+    """Test to verify IMPALA-12865 to not skip the events older but not 
processed by
+       event processor. Also, the test verifies self-events of reload event."""
+    tbl = unique_database + ".partitioned_tbl"
+    self.client.execute(
+      "create external table {} (i int) partitioned by (year int)".format(tbl))
+    self.client.execute(
+      "alter table {} add partition(year=2024)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    def __verify_refresh(verify_self_event=False):
+      prev_events_skipped = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+      if not verify_self_event:
+        handle = self.client.execute_async(
+          "refresh {} partition(year=2024)".format(tbl))
+        self.run_stmt_in_hive(
+          "alter table {} partition(year=2024) set fileformat ORC".format(tbl))
+      else:
+        handle = self.client.execute_async(
+          "refresh {} partition(year=2024) partition(year=2025)".format(tbl))
+        self.run_stmt_in_hive(
+          "create table {} (i int)".format(unique_database + ".tbl2"))
+      parts_refreshed_before = 
EventProcessorUtils.get_int_metric("partitions-refreshed")
+      self.client.wait_for_impala_state(handle, FINISHED, timeout=10)
+      assert self.client.is_finished(handle)
+      EventProcessorUtils.wait_for_event_processing(self, timeout=10)  # avoid 
flakiness
+      current_events_skipped = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+      parts_refreshed_after = 
EventProcessorUtils.get_int_metric("partitions-refreshed")
+      if verify_self_event:
+        assert parts_refreshed_after == parts_refreshed_before
+      else:
+        assert parts_refreshed_after > parts_refreshed_before
+      assert current_events_skipped > prev_events_skipped
+
+    __verify_refresh(False)
+    self.client.execute(
+      "alter table {} add partition(year=2025)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self, timeout=10)
+    __verify_refresh(True)
+
   @SkipIf.is_test_jdk
   @CustomClusterTestSuite.with_args(
       catalogd_args="--hms_event_polling_interval_s=100",

Reply via email to