This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 74bd0832e IMPALA-13631: (Addendum) Test slow concurrent alters
74bd0832e is described below
commit 74bd0832ed20aa0c2d1ef35428b2337b973cbcf4
Author: Michael Smith <[email protected]>
AuthorDate: Tue Apr 22 16:35:54 2025 -0700
IMPALA-13631: (Addendum) Test slow concurrent alters
Adds a test that multiple slow concurrent alters complete in parallel
rather than being executed sequentially.
The test creates tables, and ensures Impala's catalog is up-to-date for
their creation so that starting ALTER TABLE will be fast. Then starts
two ALTER TABLE RENAMES asynchronously - with debug_action ensuring each
takes at least 5 seconds - and waits for them to finish.
Verifies that concurrent alters are no longer blocked on "Got catalog
version read lock" and complete in less than 10 seconds.
Change-Id: I87d077aaa295943a16e6da60a2755dd289f3a132
Reviewed-on: http://gerrit.cloudera.org:8080/22804
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 8 +++--
.../java/org/apache/impala/util/DebugUtils.java | 3 ++
tests/metadata/test_ddl.py | 41 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 22e2b9cf9..f1ada478a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1222,7 +1222,7 @@ public class CatalogOpExecutor {
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
alterTableOrViewRename(tbl,
TableName.fromThrift(params.getRename_params().getNew_table_name()),
- modification, wantMinimalResult, response, catalogTimeline);
+ modification, wantMinimalResult, response, catalogTimeline,
debugAction);
modification.validateInProgressModificationComplete();
return;
}
@@ -5674,7 +5674,8 @@ public class CatalogOpExecutor {
*/
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
InProgressTableModification modification, boolean wantMinimalResult,
- TDdlExecResponse response, EventSequence catalogTimeline) throws
ImpalaException {
+ TDdlExecResponse response, EventSequence catalogTimeline,
+ @Nullable String debugAction) throws ImpalaException {
Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -5713,6 +5714,9 @@ public class CatalogOpExecutor {
Table.updateTimestampProperty(msTbl, Table.TBL_PROP_LAST_DDL_TIME);
msClient.getHiveClient().alter_table(
tableName.getDb(), tableName.getTbl(), msTbl);
+ if (debugAction != null) {
+ DebugUtils.executeDebugAction(debugAction,
DebugUtils.TABLE_RENAME_DELAY);
+ }
catalogTimeline.markEvent("Renamed table in Metastore");
} catch (TException e) {
throw new ImpalaRuntimeException(
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 0693cf775..ffe506589 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -87,6 +87,9 @@ public class DebugUtils {
// debug action label for introducing delay in loading table metadata.
public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
+ // debug action label for introducing delay in HMS alter_table rename RPC.
+ public static final String TABLE_RENAME_DELAY =
"catalogd_table_rename_delay";
+
// debug action to enable eventProcessor
public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 63175b55e..f33bb893e 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -55,6 +55,7 @@ from tests.util.filesystem_utils import (
FILESYSTEM_NAME)
from tests.common.impala_cluster import ImpalaCluster
from tests.util.filesystem_utils import FILESYSTEM_PREFIX
+from tests.util.parse_util import parse_duration_string_ms
from tests.util.shell_util import dump_server_stacktraces
@@ -483,6 +484,46 @@ class TestDdlStatements(TestDdlBase):
self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
use_db=unique_database,
multiple_impalad=self._use_multiple_impalad(vector))
+ def test_alter_table_rename_independent(self, vector, unique_database):
+ """Tests that two alter table renames run concurrently do not block each
other."""
+
+ def table_name(i):
+ return "{}.tbl_{}".format(unique_database, i)
+
+ def alter(i, j):
+ return "alter table {} rename to {}".format(table_name(i), table_name(j))
+
+ def get_read_lock_duration_ms(profile):
+ read_lock_durations = re.findall(r"Got catalog version read lock: [^
]*", profile)
+ assert len(read_lock_durations) == 1
+ return parse_duration_string_ms(read_lock_durations[0].split(" ")[-1])
+
+ self.client.execute("create table {} (i int)".format(table_name(1)))
+ self.client.execute("create table {} (i int)".format(table_name(2)))
+ # Ensure loading metadata is not a factor in alter execution time.
+ self.client.execute("describe {}".format(table_name(1)))
+ self.client.execute("describe {}".format(table_name(2)))
+
+ new_vector = deepcopy(vector)
+ new_vector.get_value('exec_option')['debug_action'] = \
+ "catalogd_table_rename_delay:SLEEP@5000"
+ with self.create_impala_client_from_vector(new_vector) as client1, \
+ self.create_impala_client_from_vector(new_vector) as client2:
+ start = time.time()
+ handle1 = client1.execute_async(alter(1, 3))
+ handle2 = client2.execute_async(alter(2, 4))
+ assert client1.wait_for_finished_timeout(handle1, timeout=10)
+ assert client2.wait_for_finished_timeout(handle2, timeout=10)
+ assert time.time() - start < 10
+
+ profile1 = client1.get_runtime_profile(handle1)
+ assert get_read_lock_duration_ms(profile1) < 5000
+ profile2 = client2.get_runtime_profile(handle2)
+ assert get_read_lock_duration_ms(profile2) < 5000
+
+ client1.close_query(handle1)
+ client2.close_query(handle2)
+
@UniqueDatabase.parametrize(num_dbs=2)
def test_concurrent_alter_table_rename(self, vector, unique_database):
test_self = self