This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6f3deabb9d0c0ca98956316bbcc31e14a3363804
Author: stiga-huang <[email protected]>
AuthorDate: Mon Aug 25 18:29:07 2025 +0800

    IMPALA-14330: set a valid createEventId in global INVALIDATE METADATA
    
    In global INVALIDATE METADATA (catalog reset), catalogd creates
    IncompleteTable for all the known table names. However, the
    createEventId is uninitialized so remain as -1. Tables could be dropped
    unintentionally by stale DropTable or AlterTableRename events.
    
    Ideally when catalogd creates an IncompleteTable during reset(), it
    should fetch the latest event on that table and use its event id as the
    createEventId. However, fetching such event ids for all tables is
    impractical to finish in a reasonable time. It also adds a significant
    load on HMS.
    
    As a compromise, this patch uses the current event id when the reset()
    operation starts, and sets it to all IncompleteTable objects created in
    this reset operation. This is enough to handle self CreateTable /
    DropTable / AlterTableRename events since such self-events generated
    before that id will be skipped. Such self-events generated after that id
    are triggered by concurrent DDLs which will wait until the corresponding
    table list is updated in reset(). The DDL will also update createEventId
    to skip stale DropTable / AlterTableRename events.
    
    Concurrent CreateTable DDLs could set a stale createEventId if their HMS
    operation finish before reset() and their catalog operations finish
    after reset() creates the table. To address this, we add a check in
    setCreateEventId() to skip stale event ids.
    
    The current event id of reset() is also used in DeleteEventLog to track
    tables removed by this operation.
    
    Refactored IncompleteTable.createUninitializedTable() to force passing a
    createEventId as a parameter.
    
    To ease debugging, adds logs when a table is added/removed in HMS events
    processing. Also adds logs when the catalog version of a table changes
    and adds logs when start processing a rename event.
    
    This patch also refactors CatalogOpExecutor.alterTableOrViewRename() by
    extracting some codes into methods. A race issue is identified and fixed
    that DeleteEventLog should be updated before renameTable() updates the
    catalog cache so the removed old table won't be added back by
    concurrently processing of a stale CREATE_TABLE event.
    
    _run_ddls_with_invalidation in test_concurrent_ddls.py could still fail
    with timeout when running with sync_ddl=true. The reason is when the DDL
    hits IMPALA-9135 and hangs, it needs catalogd to send new catalog
    updates to reach the max waiting attempts (see waitForSyncDdlVersion()).
    However, if all other concurrent threads already finish, there won't be
    any new catalog updates so the DDL will wait forever and finally result
    in the test timed out. To workaround this, this patch adds another
    concurrent thread that keeps creating new tables until the test finish.
    
    Tests:
     - Ran the following tests in test_concurrent_ddls.py 10 rounds. Each
       round takes 11 mins.
       - test_ddls_with_invalidate_metadata
       - test_ddls_with_invalidate_metadata_sync_ddl
       - test_mixed_catalog_ddls_with_invalidate_metadata
       - test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl
       - test_local_catalog_ddls_with_invalidate_metadata
       - test_local_catalog_ddls_with_invalidate_metadata_sync_ddl
       - test_local_catalog_ddls_with_invalidate_metadata_unlock_gap
    
    Change-Id: I6506821dedf7701cdfa58d14cae5760ee178c4ec
    Reviewed-on: http://gerrit.cloudera.org:8080/23346
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 41 +++++++----
 fe/src/main/java/org/apache/impala/catalog/Db.java |  7 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |  7 +-
 .../org/apache/impala/catalog/IncompleteTable.java |  8 ++-
 .../main/java/org/apache/impala/catalog/Table.java | 36 +++++++---
 .../org/apache/impala/catalog/TableLoader.java     |  2 +-
 .../impala/catalog/events/MetastoreEvents.java     |  3 +
 .../metastore/CatalogMetastoreServiceHandler.java  | 11 ++-
 .../catalog/metastore/MetastoreServiceHandler.java |  2 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 81 +++++++++++++++-------
 .../CatalogHmsSyncToLatestEventIdTest.java         |  4 +-
 tests/custom_cluster/test_concurrent_ddls.py       | 15 ++++
 12 files changed, 156 insertions(+), 61 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0943212bd..d8a639123 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -77,6 +77,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogResetManager.PrefetchedDatabaseObjects;
 import org.apache.impala.catalog.FeFsTable.Utils;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.DeleteEventLog;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import 
org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
@@ -2291,9 +2292,12 @@ public class CatalogServiceCatalog extends Catalog {
    * Returns the invalidated 'Db' object along with list of tables to be 
loaded by
    * the TableLoadingMgr. Returns null if the method encounters an exception 
during
    * invalidation.
+   * 'currentEventId' is set as createEventId of all tables under this db. 
Also used to
+   * track removed tables in EventDeleteLog.
    */
   private Pair<Db, List<TTableName>> invalidateDb(String dbName, Db existingDb,
-      PrefetchedDatabaseObjects prefetchedObjects, EventSequence 
catalogTimeline) {
+      PrefetchedDatabaseObjects prefetchedObjects, EventSequence 
catalogTimeline,
+      long currentEventId) {
     try {
       Db newDb = new Db(dbName, prefetchedObjects.getMsDb());
       // existingDb is usually null when the Catalog loads for the first time.
@@ -2323,7 +2327,7 @@ public class CatalogServiceCatalog extends Catalog {
         LOG.trace("Get {}", tblMeta);
         Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, 
tableName,
             MetastoreShim.mapToInternalTableType(tblMeta.getTableType()),
-            tblMeta.getComments());
+            tblMeta.getComments(), currentEventId);
         incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
         newDb.addTable(incompleteTbl);
         ++numTables;
@@ -2362,6 +2366,11 @@ public class CatalogServiceCatalog extends Catalog {
               existingDb, removedTableName);
           removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
           deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
+          if (isEventProcessingEnabled()) {
+            metastoreEventProcessor_.getDeleteEventLog().addRemovedObject(
+                currentEventId, DeleteEventLog.getTblKey(
+                    existingDb.getName(), removedTableName));
+          }
         }
       }
       return Pair.create(newDb, tblsToBackgroundLoad);
@@ -2458,7 +2467,8 @@ public class CatalogServiceCatalog extends Catalog {
         allHmsDbs = msClient.getHiveClient().getAllDatabases();
         catalogTimeline.markEvent("Got database list");
       }
-      rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl);
+      rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl,
+          currentEventId);
       scheduleWarmupTables();
       catalogTimeline.markEvent("Updated catalog cache");
     } catch (Exception e) {
@@ -2494,7 +2504,7 @@ public class CatalogServiceCatalog extends Catalog {
    * Entering and exiting this method should hold versionLock_.writeLock().
    */
   private void rebuildDbCache(List<String> allHmsDbs, Stopwatch unlockedTimer,
-      EventSequence catalogTimeline, boolean isSyncDdl) {
+      EventSequence catalogTimeline, boolean isSyncDdl, long currentEventId) {
     Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
 
     // Not all Java UDFs are persisted to the metastore. The ones which aren't
@@ -2539,7 +2549,8 @@ public class CatalogServiceCatalog extends Catalog {
           // null.
           PrefetchedDatabaseObjects hmsObjects = resettingDbPair.second.get();
           Db oldDb = dbCache_.get(dbName);
-          invalidatedDb = invalidateDb(dbName, oldDb, hmsObjects, 
catalogTimeline);
+          invalidatedDb = invalidateDb(dbName, oldDb, hmsObjects, 
catalogTimeline,
+              currentEventId);
         } catch (Exception e) {
           LOG.warn("Error fetching HMS objects for database " + dbName, e);
         }
@@ -2711,9 +2722,8 @@ public class CatalogServiceCatalog extends Catalog {
         deleteLog_.addRemovedObject(existingTbl.toMinimalTCatalogObject());
       }
       Table incompleteTable = IncompleteTable.createUninitializedTable(
-          db, tblName, tblType, tblComment);
+          db, tblName, tblType, tblComment, createEventId);
       incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
-      incompleteTable.setCreateEventId(createEventId);
       db.addTable(incompleteTable);
       return db.getTable(tblName);
     } finally {
@@ -3045,9 +3055,10 @@ public class CatalogServiceCatalog extends Catalog {
    * 2. null, T_new: Invalid configuration
    * 3. T_old, null: Old table was removed but new table was not added.
    * 4. T_old, T_new: Old table was removed and new table was added.
+   * Updates 'createEventId' of the new table using 'alterEventId'.
    */
   public Pair<Table, Table> renameTable(
-      TTableName oldTableName, TTableName newTableName) {
+      TTableName oldTableName, TTableName newTableName, long alterEventId) {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
     if (db == null) return Pair.create(null, null);
@@ -3059,10 +3070,16 @@ public class CatalogServiceCatalog extends Catalog {
       Table oldTable =
           removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
       if (oldTable == null) return Pair.create(null, null);
+      if (alterEventId < oldTable.getCreateEventId()) {
+        // This is usually due to alterEventId = -1, e.g. failed to fetch and 
check
+        // HMS events in the callers. Fallback to use the original 
createEventId.
+        alterEventId = oldTable.getCreateEventId();
+        LOG.warn("Reusing original createEventId {} for table {}.{}", 
alterEventId,
+            newTableName.getDb_name(), newTableName.getTable_name());
+      }
       return Pair.create(oldTable,
           addIncompleteTable(newTableName.getDb_name(), 
newTableName.getTable_name(),
-              oldTable.getTableType(), oldTable.getTableComment(),
-              oldTable.getCreateEventId()));
+              oldTable.getTableType(), oldTable.getTableComment(), 
alterEventId));
     } finally {
       versionLock_.writeLock().unlock();
     }
@@ -3377,9 +3394,9 @@ public class CatalogServiceCatalog extends Catalog {
       Table existingTbl = db.getTable(tblName);
       if (existingTbl == null) return null;
       incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
-          existingTbl.getTableType(), existingTbl.getTableComment());
+          existingTbl.getTableType(), existingTbl.getTableComment(),
+          existingTbl.getCreateEventId());
       incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
-      incompleteTable.setCreateEventId(existingTbl.getCreateEventId());
       db.addTable(incompleteTable);
     } finally {
       versionLock_.writeLock().unlock();
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java 
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 16c8ffc9d..efd7efb51 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -530,6 +530,13 @@ public class Db extends CatalogObjectImpl implements FeDb {
     }
   }
 
+  @Override
+  public void setCatalogVersion(long newVersion) {
+    LOG.info("Setting the catalog version of Db@{} {} to {}",
+        Integer.toHexString(hashCode()), getName(), newVersion);
+    super.setCatalogVersion(newVersion);
+  }
+
   @Override
   protected void setTCatalogObject(TCatalogObject catalogObject) {
     catalogObject.setDb(toThrift());
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 6c8e93e35..ef0b50cb9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -3089,7 +3089,12 @@ public class HdfsTable extends Table implements 
FeFsTable {
             pendingVersionNumber_, version);
         versionToBeSet = pendingVersionNumber_;
       }
-      LOG.trace("Setting the hdfs table {} version {}", getFullName(), 
versionToBeSet);
+      // Log the catalog version with table name and object id to ease 
debugging
+      // race issues. Object id (ClassName@HexHashCode) is used to identified 
if we have
+      // replaced the table object.
+      LOG.info("Setting the catalog version of {}@{} {} to {}",
+          getClass().getSimpleName(), Integer.toHexString(hashCode()),
+          getFullName(), versionToBeSet);
       super.setCatalogVersion(versionToBeSet);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index d0ee06a06..9dac3a20f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -190,8 +190,12 @@ public class IncompleteTable extends Table implements 
FeIncompleteTable {
   }
 
   public static IncompleteTable createUninitializedTable(Db db, String name,
-      TImpalaTableType tableType, String tableComment) {
-    return new IncompleteTable(db, name, tableType, tableComment, null);
+      TImpalaTableType tableType, String tableComment, long createEventId) {
+    IncompleteTable tbl = new IncompleteTable(db, name, tableType, 
tableComment, null);
+    // Use suppressLogging=true to avoid excessive logs for all tables during 
catalogd
+    // startup.
+    tbl.setCreateEventId(createEventId, true);
+    return tbl;
   }
 
   public static IncompleteTable createFailedMetadataLoadTable(Db db, String 
name,
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index f8e85a86f..d513c982b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -240,15 +240,21 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   // last synced id in full table reload
   public long getCreateEventId() { return createEventId_; }
 
-  public void setCreateEventId(long eventId) {
-    // TODO: Add a preconditions check for eventId < lastSycnedEventId
+  public void setCreateEventId(long eventId, boolean suppressLogging) {
+    if (eventId < createEventId_) {
+      if (!suppressLogging) {
+        LOG.warn("Ignored stale createEventId: {}. Current id: {}",
+            eventId, createEventId_);
+      }
+      return;
+    }
     createEventId_ = eventId;
-    LOG.debug("createEventId_ for table: {} set to: {}", getFullName(), 
createEventId_);
-    // TODO: Should we reset lastSyncedEvent Id if it is less than event Id?
-    // If we don't reset it - we may start syncing table from an event id which
-    // is less than create event id
+    if (!suppressLogging) {
+      LOG.debug("createEventId_ for table: {} set to: {}", getFullName(), 
createEventId_);
+    }
+    // Don't need to sync table from events older than create event id.
     if (lastSyncedEventId_ < eventId) {
-      setLastSyncedEventId(eventId);
+      setLastSyncedEventId(eventId, suppressLogging);
     }
   }
 
@@ -257,9 +263,15 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   }
 
   public void setLastSyncedEventId(long eventId) {
+    setLastSyncedEventId(eventId, false);
+  }
+
+  public void setLastSyncedEventId(long eventId, boolean suppressLogging) {
     // TODO: Add a preconditions check for eventId >= createEventId_
-    LOG.debug("lastSyncedEventId_ for table: {} set from {} to {}", 
getFullName(),
-        lastSyncedEventId_, eventId);
+    if (!suppressLogging) {
+      LOG.debug("lastSyncedEventId_ for table: {} set from {} to {}", 
getFullName(),
+          lastSyncedEventId_, eventId);
+    }
     lastSyncedEventId_ = eventId;
   }
 
@@ -581,9 +593,13 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
         // keep the legacy behavior as showing the table type as TABLE.
         tblType = TImpalaTableType.TABLE;
       }
+      // Use -1 as 'createEventId' since this is either in coordinator where 
no HMS events
+      // are being processed or imported from a COPY TESTCASE statement which 
shouldn't be
+      // used in production (it's used to examine metadata in dev env).
       newTable =
           IncompleteTable.createUninitializedTable(parentDb, 
thriftTable.getTbl_name(),
-              tblType, 
MetadataOp.getTableComment(thriftTable.getMetastore_table()));
+              tblType, 
MetadataOp.getTableComment(thriftTable.getMetastore_table()),
+              /*createEventId*/-1);
     }
     newTable.storedInImpaladCatalogCache_ = loadedInImpalad;
     try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 39db68ea1..a120b2ebf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -134,7 +134,7 @@ public class TableLoader {
             "Unrecognized table type for table: " + fullTblName);
       }
       table.updateHMSLoadTableSchemaTime(hmsLoadTime);
-      table.setCreateEventId(eventId);
+      table.setCreateEventId(eventId, /*suppressLogging*/false);
       long latestEventId = -1;
       if (syncToLatestEventId) {
         // acquire write lock on table since 
MetastoreEventProcessor.syncToLatestEventId
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 1055b212c..834adcee6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1855,6 +1855,9 @@ public class MetastoreEvents {
 
     private void processRename() throws CatalogException {
       if (!isRename_) return;
+      infoLog("Processing rename from {}.{} to {}.{}",
+          tableBefore_.getDbName(), tableBefore_.getTableName(),
+          tableAfter_.getDbName(), tableAfter_.getTableName());
       Reference<Boolean> oldTblRemoved = new Reference<>();
       Reference<Boolean> newTblAdded = new Reference<>();
       catalogOpExecutor_
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
index 3ee08483f..cc2b141f5 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -1352,19 +1352,18 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
             oldMsTable.getTableName());
         TTableName newTTable = new TTableName(newMsTable.getDbName(),
             newMsTable.getTableName());
+        // Update DeleteEventLog before we modify the catalog cache to avoid 
the table
+        // being added concurrently (by other events) during renameTable().
+        catalogOpExecutor_.addToDeleteEventLog(alterEvent.getEventId(),
+            DeleteEventLog.getTblKey(oldTTable.getDb_name(), 
oldTTable.getTable_name()));
         // Rename the table in the Catalog and get the resulting catalog 
object.
         // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
         Pair<org.apache.impala.catalog.Table, org.apache.impala.catalog.Table> 
result =
-            catalog_.renameTable(oldTTable, newTTable);
+            catalog_.renameTable(oldTTable, newTTable, 
alterEvent.getEventId());
         if (result == null || result.first == null || result.second == null) {
           throw new CatalogException("failed to rename table " + oldTTable + " 
to " +
               newTTable + " for " + apiName);
         }
-        // first set the last synced event id to the alter table's event id
-        result.second.setLastSyncedEventId(alterEvent.getEventId());
-        result.second.setCreateEventId(alterEvent.getEventId());
-        catalogOpExecutor_.addToDeleteEventLog(alterEvent.getEventId(),
-            DeleteEventLog.getTblKey(oldTTable.getDb_name(), 
oldTTable.getTable_name()));
       } finally {
         catalog_.getLock().writeLock().unlock();
       }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
 
b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 2076457d3..e8a484583 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -3263,7 +3263,7 @@ public abstract class MetastoreServiceHandler extends 
AbstractThriftHiveMetastor
         " to new table " + newDbName + "." + newTableName;
     LOG.debug("Renaming " + tableInfo);
     Pair<org.apache.impala.catalog.Table, org.apache.impala.catalog.Table> 
result =
-        catalog_.renameTable(oldTable, newTable);
+        catalog_.renameTable(oldTable, newTable, -1);
     if (result == null || result.first == null || result.second == null) {
       LOG.debug("Couldn't rename " + tableInfo);
     } else {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 312101c97..d2a1be40f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -842,6 +842,8 @@ public class CatalogOpExecutor {
         tblAddedLater.setRef(true);
         return false;
       }
+      LOG.debug("EventId: {} Removing table {}.{} since its create event id is 
{}",
+          eventId, dbName, tblName, tblToBeRemoved.getCreateEventId());
       Table removedTbl = db.removeTable(tblToBeRemoved.getName());
       removedTbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
       
catalog_.getDeleteLog().addRemovedObject(removedTbl.toMinimalTCatalogObject());
@@ -896,14 +898,15 @@ public class CatalogOpExecutor {
             "EventId: {} Table was not added since it was removed later", 
eventId);
         return false;
       }
+      // set the createEventId of the table to eventId since we are adding 
table
+      // due to the given eventId.
       Table incompleteTable = IncompleteTable.createUninitializedTable(db, 
tblName,
           MetastoreShim.mapToInternalTableType(msTbl.getTableType()),
-          MetadataOp.getTableComment(msTbl));
+          MetadataOp.getTableComment(msTbl), eventId);
       
incompleteTable.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
-      // set the createEventId of the table to eventId since we are adding 
table
-      // due to the given eventId.
-      incompleteTable.setCreateEventId(eventId);
       db.addTable(incompleteTable);
+      LOG.debug("EventId: {} Added table {}. Catalog version: {}",
+          eventId, incompleteTable.getFullName(), 
incompleteTable.getCatalogVersion());
       return true;
     } finally {
       getMetastoreDdlLock().unlock();
@@ -5809,18 +5812,36 @@ public class CatalogOpExecutor {
             String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
       }
     }
-    List<NotificationEvent> events = null;
-    // the alter table event is generated on the renamed table
-    events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId,
-        msTbl.getDbName(), msTbl.getTableName(), AlterTableEvent.EVENT_TYPE);
-    Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table,
-        org.apache.hadoop.hive.metastore.api.Table>> renamedTable =
-        getRenamedTableFromEvents(events);
+    eventId = trackAlterTableRenameEvent(tableName, newTableName, eventId,
+        catalogTimeline);
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
     Pair<Table, Table> result =
-        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift(), 
eventId);
     Preconditions.checkNotNull(result);
+    Pair<TCatalogObject, TCatalogObject> objs = handleCatalogRenameResult(
+        oldTbl, newTableName, wantMinimalResult, result, eventId, 
catalogTimeline);
+    response.result.addToRemoved_catalog_objects(objs.first);
+    response.result.addToUpdated_catalog_objects(objs.second);
+    response.result.setVersion(objs.second.getCatalog_version());
+    addSummary(response, "Renaming was successful.");
+  }
+
+  /**
+   * Finds the ALTER_TABLE HMS event triggered by this rename operation and 
updates
+   * DeleteEventLog for it. Returns the event id.
+   */
+  private long trackAlterTableRenameEvent(TableName oldTableName, TableName 
newTableName,
+      long startEventId, EventSequence catalogTimeline)
+      throws MetastoreNotificationException, CatalogException {
+    // the alter table event is generated on the renamed table
+    List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled(
+        catalogTimeline, startEventId, newTableName.getDb(), 
newTableName.getTbl(),
+        AlterTableEvent.EVENT_TYPE);
+    Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table,
+        org.apache.hadoop.hive.metastore.api.Table>> renamedTable =
+        getRenamedTableFromEvents(events);
+    long eventId = startEventId;
     if (renamedTable != null) {
       eventId = renamedTable.first;
       LOG.info("Got ALTER_TABLE RENAME event id {}.", eventId);
@@ -5828,16 +5849,27 @@ public class CatalogOpExecutor {
       // Using the eventId at the beginning of the operation is better than 
nothing.
       LOG.warn("ALTER_TABLE RENAME event not found. Using {} in createEventId 
of {} " +
               "and DeleteEventLog for {}.",
-          eventId, newTableName, tableName);
+          eventId, newTableName, oldTableName);
     }
+    // Update DeleteEventLog before we modify the catalog cache to avoid the 
table being
+    // added concurrently (by other events) during renameTable().
     if (catalog_.isEventProcessingEnabled()) {
       addToDeleteEventLog(eventId, DeleteEventLog
-          .getTblKey(tableName.getDb(), tableName.getTbl()));
-      if (result.second != null) {
-        result.second.setCreateEventId(eventId);
-      }
+          .getTblKey(oldTableName.getDb(), oldTableName.getTbl()));
     }
-    TCatalogObject oldTblDesc = null, newTblDesc = null;
+    return eventId;
+  }
+
+  /**
+   * Handles the rename results in catalog, including error handling for 
failures in
+   * bringing up the new table by implicitly invalidate the table.
+   * Returns TCatalogObject of the old and new tables.
+   */
+  private Pair<TCatalogObject, TCatalogObject> handleCatalogRenameResult(
+      Table oldTbl, TableName newTableName, boolean wantMinimalResult,
+      Pair<Table, Table> result, long alterTableEventId, EventSequence 
catalogTimeline)
+      throws ImpalaRuntimeException {
+    TCatalogObject oldTblDesc, newTblDesc;
     if (result.first == null) {
       // The old table object has been removed by a concurrent operation, e.g. 
INVALIDATE
       // METADATA <table>. Fetch the latest delete from deleteLog.
@@ -5848,8 +5880,8 @@ public class CatalogOpExecutor {
         oldTblDesc.setCatalog_version(version);
       } else {
         LOG.warn("Deletion update on the old table {} not found. Impalad might 
still "
-            + "have its metadata until the deletion update arrives from 
statestore.",
-            tableName);
+                + "have its metadata until the deletion update arrives from 
statestore.",
+            oldTbl.getFullName());
       }
     } else {
       oldTblDesc = wantMinimalResult ?
@@ -5859,7 +5891,7 @@ public class CatalogOpExecutor {
       // The rename succeeded in HMS but failed in the catalog cache. The 
cache is in an
       // inconsistent state, so invalidate the new table to reload it.
       newTblDesc = catalog_.invalidateTable(newTableName.toThrift(),
-          new Reference<>(), new Reference<>(), catalogTimeline, eventId);
+          new Reference<>(), new Reference<>(), catalogTimeline, 
alterTableEventId);
       if (newTblDesc == null) {
         throw new ImpalaRuntimeException(String.format(
             "The new table/view %s was concurrently removed during rename.",
@@ -5871,10 +5903,7 @@ public class CatalogOpExecutor {
       newTblDesc = wantMinimalResult ?
           result.second.toInvalidationObject() : 
result.second.toTCatalogObject();
     }
-    response.result.addToRemoved_catalog_objects(oldTblDesc);
-    response.result.addToUpdated_catalog_objects(newTblDesc);
-    response.result.setVersion(newTblDesc.getCatalog_version());
-    addSummary(response, "Renaming was successful.");
+    return new Pair<>(oldTblDesc, newTblDesc);
   }
 
   /**
@@ -5886,7 +5915,7 @@ public class CatalogOpExecutor {
    */
   public void addToDeleteEventLog(long eventId, String objectKey) {
     if (!catalog_.isEventProcessingEnabled()) {
-      LOG.trace("Not adding event {}:{} since events processing is not 
active", eventId,
+      LOG.trace("Not adding event {}:{} since events processing is not 
enabled", eventId,
           objectKey);
       return;
     }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
index 5a5cfbd34..5f985dd1a 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
@@ -537,8 +537,8 @@ public class CatalogHmsSyncToLatestEventIdTest extends 
AbstractCatalogMetastoreT
             createTableInHms(TEST_DB_NAME, tblName, true);
             IncompleteTable tbl =
                 
IncompleteTable.createUninitializedTable(catalog_.getDb(TEST_DB_NAME),
-                    tblName, MetadataOp.getImpalaTableType(tableType_), null);
-            tbl.setCreateEventId(getLatestEventIdFromHMS());
+                    tblName, MetadataOp.getImpalaTableType(tableType_), null,
+                    getLatestEventIdFromHMS());
             catalog_.addTable(catalog_.getDb(TEST_DB_NAME), tbl);
             long prevLastSyncedEventId =
                 catalog_.getTable(TEST_DB_NAME, 
tblName).getLastSyncedEventId();
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index 9349048df..8051d6e6a 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -174,12 +174,27 @@ class TestConcurrentDdls(CustomClusterTestSuite):
     worker = [None] * (NUM_ITERS + 1)
     for i in range(1, NUM_ITERS + 1):
       worker[i] = pool.apply_async(run_ddls, (i,))
+    # INSERT with sync_ddl=true could hit IMPALA-9135 and hanging infinitely 
if there are
+    # no more catalog updates, e.g. all other threads have finished. This 
leads to
+    # timeout in this test. As a workaround, run a thread to keep creating new 
tables
+    # to trigger new catalog updates.
+    stop = False
+    if sync_ddl:
+      def create_tbls():
+        i = 0
+        while not stop:
+          tls.client.execute("create table {}.tmp_tbl{} (i int)".format(db, i))
+          time.sleep(10)
+          i += 1
+      pool.apply_async(create_tbls)
     for i in range(1, NUM_ITERS + 1):
       try:
         worker[i].get(timeout=100)
       except TimeoutError:
+        stop = True
         dump_server_stacktraces()
         assert False, "Timeout in thread run_ddls(%d)" % i
+    stop = True
 
   @classmethod
   def is_transient_error(cls, err):

Reply via email to