This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a68f71645 IMPALA-14307: Correctly update createEventId and 
DeleteEventLog in AlterTableRename
a68f71645 is described below

commit a68f7164589b5d554d7f9a948badbc414e6b944e
Author: stiga-huang <[email protected]>
AuthorDate: Wed Aug 13 22:06:24 2025 +0800

    IMPALA-14307: Correctly update createEventId and DeleteEventLog in 
AlterTableRename
    
    When EventProcessor is paused, e.g. due to a global INVALIDATE METADATA
    operation, in alterTableOrViewRename() we don't fetch the event id of
    the ALTER_TABLE event. This causes the createEventId of the new table
    being -1 and the DeleteEventLog entry of the old table is missing. So
    stale ALTER_TABLE RENAME events could incorrectly remove the new table
    or add the old table.
    
    The other case is in the fallback invalidation added in IMPALA-13989
    that handles rename failure inside catalog (but succeeds in HMS). The
    createEventId is also set as -1.
    
    This patch fixes these by always setting a correct/meaningful
    createEventId. When fetching the ALTER_TABLE event fails, we try to use
    the event id before the HMS operation. It could be a little bit stale
    but much better than -1.
    
    Modified CatalogServiceCatalog#isEventProcessingActive() to just check
    if event processing is enabled and renamed it to
    isEventProcessingEnabled(). Note that this method is only used in DDLs
    that check their self events. We should allow these checks even when
    EventProcessor is not in the ACTIVE state. So when EventProcessor is
    recovered, fields like createEventId in tables are still correct.
    
    Removed the code of tracking in-flight events at the end of rename since
    the new table is in unloaded state and only the createEventId is useful.
    The catalog version used is also incorrect since it's not used in
    CatalogServiceCatalog#renameTable() so it doesn't make sence to use it.
    Removed the InProgressTableModification parameter of
    alterTableOrViewRename() since it's not used anymore.
    
    This patch also fixes a bug in getRenamedTableFromEvents() that it
    always returns the first event id in the list. It should use the rename
    event it finds.
    
    Tests
     - Added e2e test and ran it 40 times.
    
    Change-Id: Ie7c305e5aaafc8bbdb85830978182394619fad08
    Reviewed-on: http://gerrit.cloudera.org:8080/23291
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 14 ++---
 .../org/apache/impala/catalog/TableLoader.java     |  2 +-
 .../impala/catalog/events/MetastoreEvents.java     |  4 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 60 +++++++++++++---------
 tests/metadata/test_event_processing.py            | 45 ++++++++++++++++
 5 files changed, 92 insertions(+), 33 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5b3f26fa2..02257b4c9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -601,10 +601,8 @@ public class CatalogServiceCatalog extends Catalog {
     return metastoreEventProcessor_;
   }
 
-  public boolean isEventProcessingActive() {
-    return metastoreEventProcessor_ instanceof MetastoreEventsProcessor
-        && EventProcessorStatus.ACTIVE
-        .equals(((MetastoreEventsProcessor) 
metastoreEventProcessor_).getStatus());
+  public boolean isEventProcessingEnabled() {
+    return metastoreEventProcessor_ instanceof MetastoreEventsProcessor;
   }
 
   /**
@@ -1282,7 +1280,7 @@ public class CatalogServiceCatalog extends Catalog {
    * @return true if given event information evaluates to a self-event, false 
otherwise
    */
   public boolean evaluateSelfEvent(SelfEventContext ctx) throws 
CatalogException {
-    Preconditions.checkState(isEventProcessingActive(),
+    Preconditions.checkState(isEventProcessingEnabled(),
         "Event processing should be enabled when calling this method");
     boolean isInsertEvent = ctx.isInsertEventContext();
     long versionNumber =
@@ -1416,7 +1414,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public boolean addVersionsForInflightEvents(
       boolean isInsertEvent, Table tbl, long versionNumber) {
-    if (!isEventProcessingActive()) return false;
+    if (!isEventProcessingEnabled()) return false;
     boolean added = tbl.addToVersionsForInflightEvents(isInsertEvent, 
versionNumber);
     if (added) {
       LOG.info("Added {} {} in table's {} in-flight events",
@@ -1435,7 +1433,7 @@ public class CatalogServiceCatalog extends Catalog {
    * @return true if versionNumber is added to in-flight list. Otherwise, 
return false.
    */
   public boolean addVersionsForInflightEvents(Db db, long versionNumber) {
-    if (!isEventProcessingActive()) return false;
+    if (!isEventProcessingEnabled()) return false;
     boolean added = db.addToVersionsForInflightEvents(versionNumber);
     if (added) {
       LOG.info("Added catalog version {} in database's {} in-flight events",
@@ -3268,6 +3266,8 @@ public class CatalogServiceCatalog extends Catalog {
    * removed from the catalog cache.
    * Sets dbWasAdded to true if both a new database and table were added to 
the catalog
    * cache.
+   * 'eventId' is used to update createEventId of the table which avoids the 
table being
+   * dropped in processing older events.
    */
   public TCatalogObject invalidateTable(TTableName tableName,
       Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded,
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index f625ca100..39db68ea1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -93,7 +93,7 @@ public class TableLoader {
         msTbl = msClient.getHiveClient().getTable(db.getName(), tblName);
         catalogTimeline.markEvent(FETCHED_HMS_TABLE);
       }
-      if (eventId != -1 && catalog_.isEventProcessingActive()) {
+      if (eventId != -1 && catalog_.isEventProcessingEnabled()) {
         // If the eventId is not -1 it means this table was likely created by 
Impala.
         // However, since the load operation of the table can happen much 
later, it is
         // possible that the table was recreated outside Impala and hence the 
eventId
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index cb4fe1d3a..b5a8ce139 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1821,9 +1821,13 @@ public class MetastoreEvents {
       } else {
         if (oldTblRemoved.getRef()) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
+          infoLog("Removed table {}.{}", tableBefore_.getDbName(),
+              tableBefore_.getTableName());
         }
         if (newTblAdded.getRef()) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
+          infoLog("Added table {}.{}", tableAfter_.getDbName(),
+              tableAfter_.getTableName());
         }
       }
     }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4814403f4..312101c97 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1162,7 +1162,7 @@ public class CatalogOpExecutor {
      * parameters. No-op if event processing is disabled.
      */
     private void addCatalogServiceIdentifiersToTable() {
-      if (!catalog_.isEventProcessingActive()) return;
+      if (!catalog_.isEventProcessingEnabled()) return;
       org.apache.hadoop.hive.metastore.api.Table msTbl = 
table_.getMetaStoreTable();
       
msTbl.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
           catalog_.getCatalogServiceId());
@@ -1236,7 +1236,7 @@ public class CatalogOpExecutor {
           || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
         alterTableOrViewRename(tbl,
             
TableName.fromThrift(params.getRename_params().getNew_table_name()),
-            modification, wantMinimalResult, response, catalogTimeline, 
debugAction);
+            wantMinimalResult, response, catalogTimeline, debugAction);
         modification.validateInProgressModificationComplete();
         return;
       }
@@ -1938,7 +1938,7 @@ public class CatalogOpExecutor {
   private void addCatalogServiceIdentifiers(
       org.apache.hadoop.hive.metastore.api.Table msTbl, String 
catalogServiceId,
       long catalogVersion) {
-    if (!catalog_.isEventProcessingActive()) return;
+    if (!catalog_.isEventProcessingEnabled()) return;
     msTbl.putToParameters(
         MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
         catalogServiceId);
@@ -2395,7 +2395,7 @@ public class CatalogOpExecutor {
   private List<NotificationEvent> getNextMetastoreEventsForTableIfEnabled(
       EventSequence catalogTimeline, long eventId, String dbName, String 
tblName,
       String eventType) throws MetastoreNotificationException {
-    if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+    if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
     List<NotificationEvent> events = MetastoreEventsProcessor
         .getNextMetastoreEventsInBatchesForTable(catalog_, eventId, dbName, 
tblName,
             eventType);
@@ -2412,7 +2412,7 @@ public class CatalogOpExecutor {
   private List<NotificationEvent> getNextMetastoreEventsForDbIfEnabled(
       EventSequence catalogTimeline, long eventId, String dbName, String 
eventType)
       throws MetastoreNotificationException {
-    if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+    if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
     List<NotificationEvent> events = MetastoreEventsProcessor
         .getNextMetastoreEventsInBatchesForDb(catalog_, eventId, dbName, 
eventType);
     catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
@@ -2426,7 +2426,7 @@ public class CatalogOpExecutor {
   private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled(
       EventSequence catalogTimeline, long eventId, String dbName)
       throws MetastoreNotificationException {
-    if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+    if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
     List<String> eventTypes = Lists.newArrayList(
         DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
     NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
@@ -2527,8 +2527,10 @@ public class CatalogOpExecutor {
             
.getMetastoreEventProcessor().getEventsFactory().get(notificationEvent, null);
         Preconditions.checkState(event instanceof AlterTableEvent);
         AlterTableEvent alterEvent = (AlterTableEvent) event;
+        // Skip other alter events in case there are concurrent modification 
from other
+        // engines.
         if (!alterEvent.isRename()) continue;
-        return new Pair<>(events.get(0).getEventId(),
+        return new Pair<>(alterEvent.getEventId(),
             new Pair<>(alterEvent.getBeforeTable(), 
alterEvent.getAfterTable()));
       } catch (MetastoreNotificationException e) {
         throw new CatalogException("Unable to create a metastore event", e);
@@ -5754,8 +5756,7 @@ public class CatalogOpExecutor {
    * reloaded on the next access.
    */
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
-      InProgressTableModification modification, boolean wantMinimalResult,
-      TDdlExecResponse response, EventSequence catalogTimeline,
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline,
       @Nullable String debugAction) throws ImpalaException {
     Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
@@ -5763,10 +5764,14 @@ public class CatalogOpExecutor {
         oldTbl.getMetaStoreTable().deepCopy();
     msTbl.setDbName(newTableName.getDb());
     msTbl.setTableName(newTableName.getTbl());
+    // Gets the latest event id before we trigger the alter_table HMS RPC. We 
then use
+    // this id to find the triggered ALTER_TABLE event for self-event 
detection based on
+    // createEventId and EventDeleteLog.
     long eventId = -1;
-    try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
-      eventId = getCurrentEventId(msClient);
-      catalogTimeline.markEvent(FETCHED_LATEST_HMS_EVENT_ID + eventId);
+    if (catalog_.isEventProcessingEnabled()) {
+      try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
+        eventId = getCurrentEventId(msClient, catalogTimeline);
+      }
     }
     // If oldTbl is a synchronized Kudu table, rename the underlying Kudu 
table.
     boolean isSynchronizedKuduTable = (oldTbl instanceof KuduTable) &&
@@ -5817,11 +5822,19 @@ public class CatalogOpExecutor {
         catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
     Preconditions.checkNotNull(result);
     if (renamedTable != null) {
-      org.apache.hadoop.hive.metastore.api.Table tblBefore = 
renamedTable.second.first;
-      addToDeleteEventLog(renamedTable.first, DeleteEventLog
-          .getTblKey(tblBefore.getDbName(), tblBefore.getTableName()));
+      eventId = renamedTable.first;
+      LOG.info("Got ALTER_TABLE RENAME event id {}.", eventId);
+    } else if (catalog_.isEventProcessingEnabled()) {
+      // Using the eventId at the beginning of the operation is better than 
nothing.
+      LOG.warn("ALTER_TABLE RENAME event not found. Using {} in createEventId 
of {} " +
+              "and DeleteEventLog for {}.",
+          eventId, newTableName, tableName);
+    }
+    if (catalog_.isEventProcessingEnabled()) {
+      addToDeleteEventLog(eventId, DeleteEventLog
+          .getTblKey(tableName.getDb(), tableName.getTbl()));
       if (result.second != null) {
-        result.second.setCreateEventId(renamedTable.first);
+        result.second.setCreateEventId(eventId);
       }
     }
     TCatalogObject oldTblDesc = null, newTblDesc = null;
@@ -5846,18 +5859,15 @@ public class CatalogOpExecutor {
       // The rename succeeded in HMS but failed in the catalog cache. The 
cache is in an
       // inconsistent state, so invalidate the new table to reload it.
       newTblDesc = catalog_.invalidateTable(newTableName.toThrift(),
-          new Reference<>(), new Reference<>(), catalogTimeline);
+          new Reference<>(), new Reference<>(), catalogTimeline, eventId);
       if (newTblDesc == null) {
         throw new ImpalaRuntimeException(String.format(
             "The new table/view %s was concurrently removed during rename.",
             newTableName));
       }
+      LOG.info("Invalidated {} to recover from catalog rename failure", 
newTableName);
     } else {
       Preconditions.checkNotNull(result.first);
-      // TODO: call addVersionsForInflightEvents using 
InProgressTableModification object
-      // that is passed into catalog_.renameTable()
-      catalog_.addVersionsForInflightEvents(
-          false, result.second, modification.newVersionNumber());
       newTblDesc = wantMinimalResult ?
           result.second.toInvalidationObject() : 
result.second.toTCatalogObject();
     }
@@ -5875,7 +5885,7 @@ public class CatalogOpExecutor {
    * collected.
    */
   public void addToDeleteEventLog(long eventId, String objectKey) {
-    if (!catalog_.isEventProcessingActive()) {
+    if (!catalog_.isEventProcessingEnabled()) {
       LOG.trace("Not adding event {}:{} since events processing is not 
active", eventId,
           objectKey);
       return;
@@ -6591,7 +6601,7 @@ public class CatalogOpExecutor {
    */
   private void addCatalogServiceIdentifiers(
       org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
-    if (!catalog_.isEventProcessingActive()) return;
+    if (!catalog_.isEventProcessingEnabled()) return;
     Preconditions.checkState(msTbl.isSetParameters());
     Preconditions.checkNotNull(partition, "Partition is null");
     Map<String, String> tblParams = msTbl.getParameters();
@@ -6625,7 +6635,7 @@ public class CatalogOpExecutor {
    */
   private void addToInflightVersionsOfPartition(
       Map<String, String> partitionParams, HdfsPartition.Builder partBuilder) {
-    if (!catalog_.isEventProcessingActive()) return;
+    if (!catalog_.isEventProcessingEnabled()) return;
     Preconditions.checkState(partitionParams != null);
     String version = partitionParams
         .get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey());
@@ -8203,7 +8213,7 @@ public class CatalogOpExecutor {
    */
   private void addCatalogServiceIdentifiers(
       Database msDb, String catalogServiceId, long newCatalogVersion) {
-    if (!catalog_.isEventProcessingActive()) return;
+    if (!catalog_.isEventProcessingEnabled()) return;
     Preconditions.checkNotNull(msDb);
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
         catalogServiceId);
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index 767833304..beef68922 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -25,6 +25,7 @@ import threading
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     add_mandatory_exec_option)
+from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
 from tests.common.test_vector import HS2
@@ -744,3 +745,47 @@ class TestEventSyncWaiting(ImpalaTestSuite):
         t.close()
       self.execute_query("drop database if exists {} cascade".format(db))
       self.execute_query("drop database if exists {}_2 cascade".format(db))
+
+
+class TestSelfRenameEvent(ImpalaTestSuite):
+  @pytest.mark.execute_serially
+  def test_self_rename_events(self, unique_database):
+    """Regression test for IMPALA-14307"""
+    try:
+      catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd
+      self.execute_query("create table {}.tbl_a(i 
int)".format(unique_database))
+      # Wait until the CREATE_DATABASE and CREATE_TABLE events are skipped.
+      EventProcessorUtils.wait_for_event_processing(self)
+      self.execute_query(
+          "alter table {0}.tbl_a rename to {0}.tbl_b".format(unique_database))
+      self.execute_query(":event_processor('pause')")
+
+      with self.create_impala_client() as alter_client:
+        version_after_create = catalogd.service.get_catalog_version()
+        alter_client.set_configuration(
+            {"debug_action": "catalogd_table_rename_delay:SLEEP@6000"})
+        alter_handle = alter_client.execute_async(
+            "alter table {0}.tbl_b rename to 
{0}.tbl_a".format(unique_database))
+        alter_client.wait_for_admission_control(alter_handle, timeout_s=10)
+        # Wait for at most 10 second until catalogd increase the version for 
rename
+        # operation. This indicates the rename starts.
+        start_time = time.time()
+        while (time.time() - start_time < 10.0
+               and catalogd.service.get_catalog_version() <= 
version_after_create):
+          time.sleep(0.05)
+        # Sleep to let catalogd sends the alter_table HMS RPC
+        time.sleep(1)
+        # Invalidate tbl_b to remove it in catalog
+        self.execute_query("invalidate metadata 
{}.tbl_b".format(unique_database))
+        alter_client.wait_for_finished_timeout(alter_handle, timeout=10)
+        alter_client.close_query(alter_handle)
+
+      # Resume event processing. The first ALTER_TABLE RENAME events should be 
skipped.
+      events_skipped_before = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+      self.execute_query(":event_processor('start')")
+      EventProcessorUtils.wait_for_event_processing(self)
+      events_skipped_after = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+      assert events_skipped_after == events_skipped_before + 2
+    finally:
+      # Recover event processing to avoid impacting other tests
+      self.execute_query(":event_processor('start')")

Reply via email to