This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ab6c9467f IMPALA-12831: Fix HdfsTable.toMinimalTCatalogObject() failed
by concurrent modification
ab6c9467f is described below
commit ab6c9467f6347671b971dbce4c640bea032b6ed9
Author: stiga-huang <[email protected]>
AuthorDate: Mon Feb 26 15:59:31 2024 +0800
IMPALA-12831: Fix HdfsTable.toMinimalTCatalogObject() failed by concurrent
modification
HdfsTable.toMinimalTCatalogObject() is not always invoked with holding
the table lock, e.g. in invalidating a table, we could replace an
HdfsTable instance with an IncompleteTable instance. We then invoke
HdfsTable.toMinimalTCatalogObject() to get the removed catalog object.
However, the HdfsTable instance could be modified in the meantime by a
concurrent DDL/DML that would reload it, e.g. a REFRESH statement. This
causes HdfsTable.toMinimalTCatalogObject() failed by
ConcurrentModificationException on the column/partition list.
This patch fixes the issue by try acquiring the table read lock in
HdfsTable.toMinimalTCatalogObject(). If it fails, the partition ids and
names won't be returned. Also updates the method to not collecting the
column list since it's unused.
Tests
- Added e2e test
- Ran CORE tests
Change-Id: Ie9f8e65c0bd24000241eedf8ca765c1e4e14fdb3
Reviewed-on: http://gerrit.cloudera.org:8080/21072
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/catalog/HdfsTable.java | 58 +++++++++++++++++-----
.../main/java/org/apache/impala/catalog/Table.java | 4 +-
tests/custom_cluster/test_concurrent_ddls.py | 23 +++++++++
3 files changed, 70 insertions(+), 15 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f3c3e3132..0468bbd9e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -207,6 +207,8 @@ public class HdfsTable extends Table implements FeFsTable {
public static final String LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS =
"load-duration.all-partitions.file-metadata";
+ private static final THdfsPartition FAKE_THDFS_PARTITION = new
THdfsPartition();
+
// string to indicate NULL. set in load() from table properties
private String nullColumnValue_;
@@ -2129,7 +2131,7 @@ public class HdfsTable extends Table implements FeFsTable
{
}
/**
- * Just likes super.toMinimalTCatalogObject() but also contains the minimal
catalog
+ * Just likes super.toMinimalTCatalogObject() but try to add the minimal
catalog
* objects of partitions in the returned result.
*/
@Override
@@ -2138,19 +2140,49 @@ public class HdfsTable extends Table implements
FeFsTable {
if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
return catalogObject;
}
- catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
- THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
- nullPartitionKeyValue_, nullColumnValue_,
- /*idToPartition=*/ new HashMap<>(),
- /*prototypePartition=*/ new THdfsPartition());
- for (HdfsPartition part : partitionMap_.values()) {
- hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
+ // Try adding the partition ids and names if we can acquire the read lock.
+ // Use tryReadLock() to avoid being blocked by concurrent DDLs. It won't
result in
+ // correctness issues if we return the result without the partition ids
and names.
+ // The results are mainly used in two scenarios:
+ // 1. The result is added to catalog deleteLog for sending delete updates
for
+ // partitions. The delete update for the table is always sent so
coordinators are
+ // able to invalidate the HdfsTable object. This enforces the
correctness.
+ // However, not sending the deletes causes a leak of the topic values in
+ // statestore if the topic keys (tableName+partName) are not reused
anymore.
+ // Note that topic keys are never deleted in the TopicEntryMap of
statestore even
+ // if we send the delete updates. So we already have a leak on catalog
topic keys.
+ // We can revisit this if we found statestore has memory issues.
+ // 2. The result is used in DDL/DML response for a removed/invalidated
table.
+ // Coordinators can still invalidate its cache since the table is sent
(in the
+ // parent implementation).
+ // However, LocalCatalog coordinators can't immediately invalidate
partitions of
+ // a removed table. They will be cleared by the cache eviction policy
since the
+ // partitions of deleted tables won't be used anymore.
+ // TODO: synchronize the access on the partition map by using a
finer-grained lock
+ if (!tryReadLock()) {
+ LOG.warn("Not returning the partition ids and names of table {} since
not " +
+ "holding the table read lock", getFullName());
+ return catalogObject;
+ }
+ try {
+ catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
+ // Column names are unused by the consumers so use an empty list here.
+ THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_,
+ /*colNames=*/ Collections.emptyList(),
+ nullPartitionKeyValue_, nullColumnValue_,
+ /*idToPartition=*/ new HashMap<>(),
+ /*prototypePartition=*/ FAKE_THDFS_PARTITION);
+ for (HdfsPartition part : partitionMap_.values()) {
+ hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
+ }
+ hdfsTable.setHas_full_partitions(false);
+ // The minimal catalog object of partitions contain the partition names.
+ hdfsTable.setHas_partition_names(true);
+ catalogObject.getTable().setHdfs_table(hdfsTable);
+ return catalogObject;
+ } finally {
+ releaseReadLock();
}
- hdfsTable.setHas_full_partitions(false);
- // The minimal catalog object of partitions contain the partition names.
- hdfsTable.setHas_partition_names(true);
- catalogObject.getTable().setHdfs_table(hdfsTable);
- return catalogObject;
}
/**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 4dbd4fcfe..6299dfc89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -86,10 +86,10 @@ public abstract class Table extends CatalogObjectImpl
implements FeTable {
protected final String full_name_;
protected final String owner_;
protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
- // Lock protecting this table. A read lock must be table when we are
serializing
+ // Lock protecting this table. A read lock must be held when we are
serializing
// the table contents over thrift (e.g when returning the table to clients
over thrift
// or when topic-update thread serializes the table in the topic update)
- // A write lock must be table when the table is being modified (e.g. DDLs or
refresh)
+ // A write lock must be held when the table is being modified (e.g. DDLs or
refresh)
private final ReentrantReadWriteLock tableLock_ = new ReentrantReadWriteLock(
true /*fair ordering*/);
private final ReadLock readLock_ = tableLock_.readLock();
diff --git a/tests/custom_cluster/test_concurrent_ddls.py
b/tests/custom_cluster/test_concurrent_ddls.py
index 4052de682..3630db369 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -197,3 +197,26 @@ class TestConcurrentDdls(CustomClusterTestSuite):
dump_server_stacktraces()
assert False, "INVALIDATE METADATA timeout in 60s!"
pool.terminate()
+
+ @CustomClusterTestSuite.with_args(
+ catalogd_args="--enable_incremental_metadata_updates=true")
+ def test_concurrent_invalidate_metadata_with_refresh(self, unique_database):
+ # Create a wide table with some partitions
+ tbl = unique_database + ".wide_tbl"
+ create_stmt = "create table {} (".format(tbl)
+ for i in range(600):
+ create_stmt += "col{} int, ".format(i)
+ create_stmt += "col600 int) partitioned by (p int) stored as textfile"
+ self.execute_query(create_stmt)
+ for i in range(10):
+ self.execute_query("alter table {} add partition (p={})".format(tbl, i))
+
+ refresh_stmt = "refresh " + tbl
+ refresh_handle = self.client.execute_async(refresh_stmt)
+ for i in range(10):
+ self.execute_query("invalidate metadata " + tbl)
+ # Always keep a concurrent REFRESH statement running
+ refresh_state = self.client.get_state(refresh_handle)
+ if refresh_state == self.client.QUERY_STATES['FINISHED']\
+ or refresh_state == self.client.QUERY_STATES['EXCEPTION']:
+ refresh_handle = self.client.execute_async(refresh_stmt)