This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 78b9285da457c6853e513f3852730867d4dbe632
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Wed Sep 27 21:22:31 2023 +0200

    IMPALA-12461 part1: Avoid taking db/table level locks db/table self-event 
check
    
    DB / Table level locks can be held for a long time by DDL/DML
    operations in the catalogd. Trying to get the lock during self-event
    check would mean blocking if there is an ongoing operation for
    the given db/table.
    
    This patch tries to solve it for the easy case of db/table level
    events by using self-event specific locking that should be only
    taken for short times to add/remove events from/to the in-flight
    event list. The InFlightEvents object itself is used as lock
    and no other of catalogd's lock must be acquired after it to
    avoid deadlock.
    
    Postponing solving this for partition level events as that would
    be more complex, as both the partition list and the partitions'
    in-flight event lists would need to be protected from parallel
    operations that add/remove partitions.
    
    Testing:
    - ran event processing related tests
    
    Change-Id: Ife455de09ab2e262bde1e4b5bd54c8c54c75f2cd
    Reviewed-on: http://gerrit.cloudera.org:8080/20516
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 134 ++++++++++++---------
 fe/src/main/java/org/apache/impala/catalog/Db.java |  17 ++-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   1 +
 .../main/java/org/apache/impala/catalog/Table.java |  42 ++++---
 4 files changed, 114 insertions(+), 80 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index ddb08e75f..27d7a64e4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1046,83 +1046,99 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.debug("Not a self-event because eventId is {}", versionNumber);
       return false;
     }
-    Db db = getDb(ctx.getDbName());
+    Db db = getDb(ctx.getDbName()); // Uses thread-safe hashmap.
     if (db == null) {
       throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " 
not found");
     }
-    // if the given tblName is null we look db's in-flight events
     if (ctx.getTblName() == null) {
-      //TODO use read/write locks for both table and db
-      if (!tryLockDb(db)) {
-        throw new CatalogException("Could not acquire lock on database object 
" +
-            db.getName());
-      }
-      versionLock_.writeLock().unlock();
-      try {
-        boolean removed = 
db.removeFromVersionsForInflightEvents(versionNumber);
-        if (!removed) {
-          LOG.debug("Could not find version {} in the in-flight event list of 
database "
-              + "{}", versionNumber, db.getName());
-        }
-        return removed;
-      } finally {
-        db.getLock().unlock();
-      }
+      // Not taking lock, rely on Db's internal locking.
+      return evaluateSelfEventForDb(db, versionNumber);
     }
-    Table tbl = db.getTable(ctx.getTblName());
+
+    Table tbl = db.getTable(ctx.getTblName()); // Uses thread-safe hashmap.
     if (tbl == null) {
       throw new TableNotFoundException(
           String.format("Table %s.%s not found", ctx.getDbName(), 
ctx.getTblName()));
     }
-    // we should acquire the table lock so that we wait for any other updates
-    // happening to this table at the same time
+
+    if (ctx.getPartitionKeyValues() == null) {
+      // Not taking lock, rely on Table's internal locking.
+      return evaluateSelfEventForTable(tbl, isInsertEvent, versionNumber);
+    }
+
+    // TODO: Could be a Precondition? If partitionKeyValues != null, this 
should be
+    // a HDFS table.
+    if (!(tbl instanceof HdfsTable)) return false;
+
+    // We should acquire the table lock so that we wait for any other updates
+    // happening to this table at the same time, as they could affect the list 
of
+    // partitions.
+    // TODO: add more fine grained locking to protect partitions without taking
+    //       longly held locks (IMPALA-12461)
+    // Increasing the timeout would not help if the table lock is held by
+    // a concurrent DDL as we would need the lock during the actual processing 
of the
+    // event.
     if (!tryWriteLock(tbl)) {
       throw new CatalogException(String.format("Error during self-event 
evaluation "
           + "for table %s due to lock contention", tbl.getFullName()));
     }
     versionLock_.writeLock().unlock();
     try {
-      List<List<TPartitionKeyValue>> partitionKeyValues = 
ctx.getPartitionKeyValues();
-      // if the partitionKeyValues is null, we look for tbl's in-flight events
-      if (partitionKeyValues == null) {
-        boolean removed =
-            tbl.removeFromVersionsForInflightEvents(isInsertEvent, 
versionNumber);
-        if (!removed) {
-          LOG.debug("Could not find {} {} in in-flight event list of table {}",
-              isInsertEvent ? "eventId" : "version", versionNumber, 
tbl.getFullName());
-        }
-        return removed;
-      }
-      if (tbl instanceof HdfsTable) {
-        List<String> failingPartitions = new ArrayList<>();
-        int len = partitionKeyValues.size();
-        for (int i=0; i<len; ++i) {
-          List<TPartitionKeyValue> partitionKeyValue = 
partitionKeyValues.get(i);
-          versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : 
versionNumber;
-          HdfsPartition hdfsPartition =
-              ((HdfsTable) 
tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
-          if (hdfsPartition == null
-              || 
!hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent,
-                  versionNumber)) {
-            // even if this is an error condition we should not bail out early 
since we
-            // should clean up the self-event state on the rest of the 
partitions
-            String partName = 
HdfsTable.constructPartitionName(partitionKeyValue);
-            if (hdfsPartition == null) {
-              LOG.debug("Partition {} not found during self-event "
-                  + "evaluation for the table {}", partName, 
tbl.getFullName());
-            } else {
-              LOG.trace("Could not find {} in in-flight event list of the 
partition {} "
-                  + "of table {}", versionNumber, partName, tbl.getFullName());
-            }
-            failingPartitions.add(partName);
-          }
-        }
-        return failingPartitions.isEmpty();
-      }
+      return evaluateSelfEventForPartition(
+          ctx, (HdfsTable)tbl, isInsertEvent, versionNumber);
     } finally {
       tbl.releaseWriteLock();
     }
-    return false;
+  }
+
+  private boolean evaluateSelfEventForDb(Db db, long versionNumber)
+      throws CatalogException {
+    boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
+    if (!removed) {
+      LOG.debug("Could not find version {} in the in-flight event list of 
database "
+          + "{}", versionNumber, db.getName());
+    }
+    return removed;
+  }
+
+  private boolean evaluateSelfEventForTable(Table tbl,
+      boolean isInsertEvent, long versionNumber) throws CatalogException {
+    boolean removed =
+        tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
+    if (!removed) {
+      LOG.debug("Could not find {} {} in in-flight event list of table {}",
+          isInsertEvent ? "eventId" : "version", versionNumber, 
tbl.getFullName());
+    }
+    return removed;
+  }
+
+  private boolean evaluateSelfEventForPartition(SelfEventContext ctx, 
HdfsTable tbl,
+      boolean isInsertEvent, long versionNumber) throws CatalogException {
+    List<List<TPartitionKeyValue>> partitionKeyValues = 
ctx.getPartitionKeyValues();
+    List<String> failingPartitions = new ArrayList<>();
+    int len = partitionKeyValues.size();
+    for (int i=0; i<len; ++i) {
+      List<TPartitionKeyValue> partitionKeyValue = partitionKeyValues.get(i);
+      versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : versionNumber;
+      HdfsPartition hdfsPartition =
+          tbl.getPartitionFromThriftPartitionSpec(partitionKeyValue);
+      if (hdfsPartition == null
+          || !hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent,
+              versionNumber)) {
+        // even if this is an error condition we should not bail out early 
since we
+        // should clean up the self-event state on the rest of the partitions
+        String partName = HdfsTable.constructPartitionName(partitionKeyValue);
+        if (hdfsPartition == null) {
+          LOG.debug("Partition {} not found during self-event "
+              + "evaluation for the table {}", partName, tbl.getFullName());
+        } else {
+          LOG.trace("Could not find {} in in-flight event list of the 
partition {} "
+              + "of table {}", versionNumber, partName, tbl.getFullName());
+        }
+        failingPartitions.add(partName);
+      }
+    }
+    return failingPartitions.isEmpty();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java 
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 0cb800b0c..ad25324fb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -99,6 +99,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
   private boolean isSystemDb_ = false;
 
   // tracks the in-flight metastore events for this db
+  // Also used as a monitor object to synchronize access to it to avoid 
blocking on table
+  // lock during self-event check. If both this and dbLock_ or catalog version 
lock are
+  // taken, inFlightEvents_ must be the last to avoid deadlock.
   private final InFlightEvents inFlightEvents_ = new InFlightEvents();
 
   // lock to make sure modifications to the Db object are atomically done 
along with
@@ -565,10 +568,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
-    Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
-        "removeFromVersionsForInflightEvents called without getting the db 
lock for "
-            + getName() + " database.");
-    return inFlightEvents_.remove(false, versionNumber);
+    synchronized (inFlightEvents_) {
+      return inFlightEvents_.remove(false, versionNumber);
+    }
   }
 
   /**
@@ -579,10 +581,15 @@ public class Db extends CatalogObjectImpl implements FeDb 
{
    * @param versionNumber version number to add
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    // The lock is not needed for thread safety, just verifying existing 
behavior.
     Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
         "addToVersionsForInFlightEvents called without getting the db lock for 
"
             + getName() + " database.");
-    if (!inFlightEvents_.add(false, versionNumber)) {
+    boolean added = false;
+    synchronized (inFlightEvents_) {
+      added = inFlightEvents_.add(false, versionNumber);
+    }
+    if (!added) {
       LOG.warn(String.format("Could not add version %s to the list of 
in-flight "
           + "events. This could cause unnecessary database %s invalidation 
when the "
           + "event is processed", versionNumber, getName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c57671a43..8c072844d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2017,6 +2017,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
     return ret;
   }
 
+  // TODO: why is there a single synchronized function?
   @Override
   protected synchronized void loadFromThrift(TTable thriftTable)
       throws TableLoadingException {
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2364f82f9..3f95764c1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -148,8 +148,11 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   protected volatile long createEventId_ = -1;
 
   // tracks the in-flight metastore events for this table. Used by Events 
processor to
-  // avoid unnecessary refresh when the event is received
-  private final InFlightEvents inFlightEvents = new InFlightEvents();
+  // avoid unnecessary refresh when the event is received.
+  // Also used as a monitor object to synchronize access to it to avoid 
blocking on table
+  // lock during self-event check. If both this and tableLock_ or catalog 
version lock are
+  // taken, inFlightEvents_ must be the last to avoid deadlock.
+  private final InFlightEvents inFlightEvents_ = new InFlightEvents();
 
   // Table metrics. These metrics are applicable to all table types. Each 
subclass of
   // Table can define additional metrics specific to that table type.
@@ -981,12 +984,13 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
    */
   public boolean removeFromVersionsForInflightEvents(
       boolean isInsertEvent, long versionNumber) {
-    Preconditions.checkState(isWriteLockedByCurrentThread(),
-        "removeFromVersionsForInFlightEvents called without taking the table 
lock on "
-            + getFullName());
-    boolean removed = inFlightEvents.remove(isInsertEvent, versionNumber);
-    if (removed) {
-      metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).dec();
+    boolean removed = false;
+    synchronized (inFlightEvents_) {
+      removed = inFlightEvents_.remove(isInsertEvent, versionNumber);
+      // Locked updating of counters is not need for correctnes but tests may 
rely on it.
+      if (removed) {
+        metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).dec();
+      }
     }
     return removed;
   }
@@ -1004,16 +1008,22 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
    * capacity
    */
   public void addToVersionsForInflightEvents(boolean isInsertEvent, long 
versionNumber) {
-    // we generally don't take locks on Incomplete tables since they are 
atomically
-    // replaced during load
+    // We generally don't take locks on Incomplete tables since they are 
atomically
+    // replaced during load.
+    // The lock is not needed for thread safety, just verifying existing 
behavior.
     Preconditions.checkState(
         this instanceof IncompleteTable || isWriteLockedByCurrentThread());
-    if (!inFlightEvents.add(isInsertEvent, versionNumber)) {
-      LOG.warn(String.format("Could not add %s version to the table %s. This 
could "
-          + "cause unnecessary refresh of the table when the event is received 
by the "
-              + "Events processor.", versionNumber, getFullName()));
-    } else {
-      metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).inc();
+    boolean added = false;
+    synchronized (inFlightEvents_) {
+      added = inFlightEvents_.add(isInsertEvent, versionNumber);
+      // Locked updating of counters is not need for correctnes but tests may 
rely on it.
+      if (!added) {
+        LOG.warn(String.format("Could not add %s version to the table %s. This 
could "
+            + "cause unnecessary refresh of the table when the event is 
received by the "
+                + "Events processor.", versionNumber, getFullName()));
+      } else {
+        metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).inc();
+      }
     }
   }
 

Reply via email to