This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 78b9285da457c6853e513f3852730867d4dbe632 Author: Csaba Ringhofer <[email protected]> AuthorDate: Wed Sep 27 21:22:31 2023 +0200 IMPALA-12461 part1: Avoid taking db/table level locks db/table self-event check DB / Table level locks can be held for a long time by DDL/DML operations in the catalogd. Trying to get the lock during self-event check would mean blocking if there is an ongoing operation for the given db/table. This patch tries to solve it for the easy case of db/table level events by using self-event specific locking that should be only taken for short times to add/remove events from/to the in-flight event list. The InFlightEvents object itself is used as lock and no other of catalogd's lock must be acquired after it to avoid deadlock. Postponing solving this for partition level events as that would be more complex, as both the partition list and the partitions' in-flight event lists would need to be protected from parallel operations that add/remove partitions. Testing: - ran event processing related tests Change-Id: Ife455de09ab2e262bde1e4b5bd54c8c54c75f2cd Reviewed-on: http://gerrit.cloudera.org:8080/20516 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 134 ++++++++++++--------- fe/src/main/java/org/apache/impala/catalog/Db.java | 17 ++- .../java/org/apache/impala/catalog/HdfsTable.java | 1 + .../main/java/org/apache/impala/catalog/Table.java | 42 ++++--- 4 files changed, 114 insertions(+), 80 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index ddb08e75f..27d7a64e4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -1046,83 +1046,99 @@ public class CatalogServiceCatalog extends Catalog { LOG.debug("Not a self-event because eventId is {}", versionNumber); return false; } - Db db = getDb(ctx.getDbName()); + Db db = getDb(ctx.getDbName()); // Uses thread-safe hashmap. if (db == null) { throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found"); } - // if the given tblName is null we look db's in-flight events if (ctx.getTblName() == null) { - //TODO use read/write locks for both table and db - if (!tryLockDb(db)) { - throw new CatalogException("Could not acquire lock on database object " + - db.getName()); - } - versionLock_.writeLock().unlock(); - try { - boolean removed = db.removeFromVersionsForInflightEvents(versionNumber); - if (!removed) { - LOG.debug("Could not find version {} in the in-flight event list of database " - + "{}", versionNumber, db.getName()); - } - return removed; - } finally { - db.getLock().unlock(); - } + // Not taking lock, rely on Db's internal locking. + return evaluateSelfEventForDb(db, versionNumber); } - Table tbl = db.getTable(ctx.getTblName()); + + Table tbl = db.getTable(ctx.getTblName()); // Uses thread-safe hashmap. if (tbl == null) { throw new TableNotFoundException( String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName())); } - // we should acquire the table lock so that we wait for any other updates - // happening to this table at the same time + + if (ctx.getPartitionKeyValues() == null) { + // Not taking lock, rely on Table's internal locking. + return evaluateSelfEventForTable(tbl, isInsertEvent, versionNumber); + } + + // TODO: Could be a Precondition? If partitionKeyValues != null, this should be + // a HDFS table. + if (!(tbl instanceof HdfsTable)) return false; + + // We should acquire the table lock so that we wait for any other updates + // happening to this table at the same time, as they could affect the list of + // partitions. + // TODO: add more fine grained locking to protect partitions without taking + // longly held locks (IMPALA-12461) + // Increasing the timeout would not help if the table lock is held by + // a concurrent DDL as we would need the lock during the actual processing of the + // event. if (!tryWriteLock(tbl)) { throw new CatalogException(String.format("Error during self-event evaluation " + "for table %s due to lock contention", tbl.getFullName())); } versionLock_.writeLock().unlock(); try { - List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues(); - // if the partitionKeyValues is null, we look for tbl's in-flight events - if (partitionKeyValues == null) { - boolean removed = - tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber); - if (!removed) { - LOG.debug("Could not find {} {} in in-flight event list of table {}", - isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName()); - } - return removed; - } - if (tbl instanceof HdfsTable) { - List<String> failingPartitions = new ArrayList<>(); - int len = partitionKeyValues.size(); - for (int i=0; i<len; ++i) { - List<TPartitionKeyValue> partitionKeyValue = partitionKeyValues.get(i); - versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : versionNumber; - HdfsPartition hdfsPartition = - ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue); - if (hdfsPartition == null - || !hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent, - versionNumber)) { - // even if this is an error condition we should not bail out early since we - // should clean up the self-event state on the rest of the partitions - String partName = HdfsTable.constructPartitionName(partitionKeyValue); - if (hdfsPartition == null) { - LOG.debug("Partition {} not found during self-event " - + "evaluation for the table {}", partName, tbl.getFullName()); - } else { - LOG.trace("Could not find {} in in-flight event list of the partition {} " - + "of table {}", versionNumber, partName, tbl.getFullName()); - } - failingPartitions.add(partName); - } - } - return failingPartitions.isEmpty(); - } + return evaluateSelfEventForPartition( + ctx, (HdfsTable)tbl, isInsertEvent, versionNumber); } finally { tbl.releaseWriteLock(); } - return false; + } + + private boolean evaluateSelfEventForDb(Db db, long versionNumber) + throws CatalogException { + boolean removed = db.removeFromVersionsForInflightEvents(versionNumber); + if (!removed) { + LOG.debug("Could not find version {} in the in-flight event list of database " + + "{}", versionNumber, db.getName()); + } + return removed; + } + + private boolean evaluateSelfEventForTable(Table tbl, + boolean isInsertEvent, long versionNumber) throws CatalogException { + boolean removed = + tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber); + if (!removed) { + LOG.debug("Could not find {} {} in in-flight event list of table {}", + isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName()); + } + return removed; + } + + private boolean evaluateSelfEventForPartition(SelfEventContext ctx, HdfsTable tbl, + boolean isInsertEvent, long versionNumber) throws CatalogException { + List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues(); + List<String> failingPartitions = new ArrayList<>(); + int len = partitionKeyValues.size(); + for (int i=0; i<len; ++i) { + List<TPartitionKeyValue> partitionKeyValue = partitionKeyValues.get(i); + versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : versionNumber; + HdfsPartition hdfsPartition = + tbl.getPartitionFromThriftPartitionSpec(partitionKeyValue); + if (hdfsPartition == null + || !hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent, + versionNumber)) { + // even if this is an error condition we should not bail out early since we + // should clean up the self-event state on the rest of the partitions + String partName = HdfsTable.constructPartitionName(partitionKeyValue); + if (hdfsPartition == null) { + LOG.debug("Partition {} not found during self-event " + + "evaluation for the table {}", partName, tbl.getFullName()); + } else { + LOG.trace("Could not find {} in in-flight event list of the partition {} " + + "of table {}", versionNumber, partName, tbl.getFullName()); + } + failingPartitions.add(partName); + } + } + return failingPartitions.isEmpty(); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index 0cb800b0c..ad25324fb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -99,6 +99,9 @@ public class Db extends CatalogObjectImpl implements FeDb { private boolean isSystemDb_ = false; // tracks the in-flight metastore events for this db + // Also used as a monitor object to synchronize access to it to avoid blocking on table + // lock during self-event check. If both this and dbLock_ or catalog version lock are + // taken, inFlightEvents_ must be the last to avoid deadlock. private final InFlightEvents inFlightEvents_ = new InFlightEvents(); // lock to make sure modifications to the Db object are atomically done along with @@ -565,10 +568,9 @@ public class Db extends CatalogObjectImpl implements FeDb { * @return true if version was successfully removed, false if didn't exist */ public boolean removeFromVersionsForInflightEvents(long versionNumber) { - Preconditions.checkState(dbLock_.isHeldByCurrentThread(), - "removeFromVersionsForInflightEvents called without getting the db lock for " - + getName() + " database."); - return inFlightEvents_.remove(false, versionNumber); + synchronized (inFlightEvents_) { + return inFlightEvents_.remove(false, versionNumber); + } } /** @@ -579,10 +581,15 @@ public class Db extends CatalogObjectImpl implements FeDb { * @param versionNumber version number to add */ public void addToVersionsForInflightEvents(long versionNumber) { + // The lock is not needed for thread safety, just verifying existing behavior. Preconditions.checkState(dbLock_.isHeldByCurrentThread(), "addToVersionsForInFlightEvents called without getting the db lock for " + getName() + " database."); - if (!inFlightEvents_.add(false, versionNumber)) { + boolean added = false; + synchronized (inFlightEvents_) { + added = inFlightEvents_.add(false, versionNumber); + } + if (!added) { LOG.warn(String.format("Could not add version %s to the list of in-flight " + "events. This could cause unnecessary database %s invalidation when the " + "event is processed", versionNumber, getName())); diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index c57671a43..8c072844d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2017,6 +2017,7 @@ public class HdfsTable extends Table implements FeFsTable { return ret; } + // TODO: why is there a single synchronized function? @Override protected synchronized void loadFromThrift(TTable thriftTable) throws TableLoadingException { diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 2364f82f9..3f95764c1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -148,8 +148,11 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { protected volatile long createEventId_ = -1; // tracks the in-flight metastore events for this table. Used by Events processor to - // avoid unnecessary refresh when the event is received - private final InFlightEvents inFlightEvents = new InFlightEvents(); + // avoid unnecessary refresh when the event is received. + // Also used as a monitor object to synchronize access to it to avoid blocking on table + // lock during self-event check. If both this and tableLock_ or catalog version lock are + // taken, inFlightEvents_ must be the last to avoid deadlock. + private final InFlightEvents inFlightEvents_ = new InFlightEvents(); // Table metrics. These metrics are applicable to all table types. Each subclass of // Table can define additional metrics specific to that table type. @@ -981,12 +984,13 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { */ public boolean removeFromVersionsForInflightEvents( boolean isInsertEvent, long versionNumber) { - Preconditions.checkState(isWriteLockedByCurrentThread(), - "removeFromVersionsForInFlightEvents called without taking the table lock on " - + getFullName()); - boolean removed = inFlightEvents.remove(isInsertEvent, versionNumber); - if (removed) { - metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).dec(); + boolean removed = false; + synchronized (inFlightEvents_) { + removed = inFlightEvents_.remove(isInsertEvent, versionNumber); + // Locked updating of counters is not need for correctnes but tests may rely on it. + if (removed) { + metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).dec(); + } } return removed; } @@ -1004,16 +1008,22 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { * capacity */ public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) { - // we generally don't take locks on Incomplete tables since they are atomically - // replaced during load + // We generally don't take locks on Incomplete tables since they are atomically + // replaced during load. + // The lock is not needed for thread safety, just verifying existing behavior. Preconditions.checkState( this instanceof IncompleteTable || isWriteLockedByCurrentThread()); - if (!inFlightEvents.add(isInsertEvent, versionNumber)) { - LOG.warn(String.format("Could not add %s version to the table %s. This could " - + "cause unnecessary refresh of the table when the event is received by the " - + "Events processor.", versionNumber, getFullName())); - } else { - metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).inc(); + boolean added = false; + synchronized (inFlightEvents_) { + added = inFlightEvents_.add(isInsertEvent, versionNumber); + // Locked updating of counters is not need for correctnes but tests may rely on it. + if (!added) { + LOG.warn(String.format("Could not add %s version to the table %s. This could " + + "cause unnecessary refresh of the table when the event is received by the " + + "Events processor.", versionNumber, getFullName())); + } else { + metrics_.getCounter(NUMBER_OF_INFLIGHT_EVENTS).inc(); + } } }
