This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 36640548e IMPALA-12545: Use 'unique_database' in
test_restart_services.py::TestRestart
36640548e is described below
commit 36640548e6c8ca2638a170cbbb8b10af468003e9
Author: Daniel Becker <[email protected]>
AuthorDate: Mon Nov 6 16:21:32 2023 +0100
IMPALA-12545: Use 'unique_database' in test_restart_services.py::TestRestart
Some tests in 'test_restart_services.py::TestRestart' create tables but
don't use a 'unique_database' for it. The table and column names are
sometimes the same. The tests are run serially but if a table is not
deleted successfully it may interfere with other tests.
This change introduces 'unique_database' in tests that create tables.
After this, explicitly deleting the tables is no longer necessary as the
framework takes care of it.
Testing:
- all affected tests pass
Change-Id: Id4c2cfb669d5ff9e4d8110a0035bae1147e2db5a
Reviewed-on: http://gerrit.cloudera.org:8080/20662
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
tests/custom_cluster/test_restart_services.py | 91 ++++++++++++++-------------
1 file changed, 46 insertions(+), 45 deletions(-)
diff --git a/tests/custom_cluster/test_restart_services.py
b/tests/custom_cluster/test_restart_services.py
index 56533b2c8..cd0f76522 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -169,9 +169,10 @@ class TestRestart(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--statestore_update_frequency_ms=5000 "
"--statestore_heartbeat_frequency_ms=10000")
- def test_restart_catalogd(self):
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
- self.execute_query_expect_success(self.client, "create table join_aa(id
int)")
+ def test_restart_catalogd(self, unique_database):
+ tbl_name = unique_database + ".join_aa"
+ self.execute_query_expect_success(
+ self.client, "create table {}(id int)".format(tbl_name))
# Make the catalog object version grow large enough
self.execute_query_expect_success(self.client, "invalidate metadata")
@@ -179,7 +180,7 @@ class TestRestart(CustomClusterTestSuite):
# the local catalog cache of impalad out of sync
for i in range(0, 10):
try:
- query = "alter table join_aa add columns (age" + str(i) + " int)"
+ query = "alter table {} add columns (age{} int)".format(tbl_name, i)
self.execute_query_async(query)
except Exception as e:
LOG.info(str(e))
@@ -187,9 +188,8 @@ class TestRestart(CustomClusterTestSuite):
self.cluster.catalogd.restart()
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)")
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC = 5
@@ -199,7 +199,8 @@ class TestRestart(CustomClusterTestSuite):
impalad_args=("--wait_for_new_catalog_service_id_timeout_sec={} \
--wait_for_new_catalog_service_id_max_iterations=-1"
.format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)))
- def test_restart_catalogd_while_handling_rpc_response_with_timeout(self):
+ def test_restart_catalogd_while_handling_rpc_response_with_timeout(self,
+ unique_database):
"""Regression test for IMPALA-12267. We'd like to cause a situation where
- The coordinator issues a DDL or DML query
- Catalogd sends a response RPC
@@ -209,8 +210,9 @@ class TestRestart(CustomClusterTestSuite):
Before IMPALA-12267 the coordinator hung infinitely in this situation,
waiting for a
statestore update with a new catalog service ID assuming the service ID it
had was
stale, but it already had the most recent one."""
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
- self.execute_query_expect_success(self.client, "create table join_aa(id
int)")
+ tbl_name = unique_database + ".handling_rpc_response_with_timeout"
+ self.execute_query_expect_success(
+ self.client, "create table {}(id int)".format(tbl_name))
# Make the catalog object version grow large enough
self.execute_query_expect_success(self.client, "invalidate metadata")
@@ -218,7 +220,7 @@ class TestRestart(CustomClusterTestSuite):
DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
.format(debug_action_sleep_time_sec * 1000))
- query = "alter table join_aa add columns (age" + " int)"
+ query = "alter table {} add columns (age int)".format(tbl_name)
handle = self.execute_query_async(query, query_options={"debug_action":
DEBUG_ACTION})
# Wait a bit so the RPC from the catalogd arrives to the coordinator.
@@ -236,12 +238,11 @@ class TestRestart(CustomClusterTestSuite):
self.assert_impalad_log_contains("WARNING",
"Ignoring catalog update result of catalog service ID")
- self.execute_query_expect_success(self.client, "select age from join_aa")
+ self.execute_query_expect_success(self.client, "select age from
{}".format(tbl_name))
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)")
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS = 3
STATESTORE_UPDATE_FREQ_SEC = 2
@@ -253,14 +254,16 @@ class TestRestart(CustomClusterTestSuite):
impalad_args=("--wait_for_new_catalog_service_id_timeout_sec=-1 \
--wait_for_new_catalog_service_id_max_iterations={}"
.format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)))
- def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self):
+ def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self,
+ unique_database):
"""We create the same situation as described in
'test_restart_catalogd_while_handling_rpc_response_with_timeout()' but we
get out of
it not by timing out but by giving up waiting after receiving
'WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS' updates from the statestore that
don't change
the catalog service ID."""
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
- self.execute_query_expect_success(self.client, "create table join_aa(id
int)")
+ tbl_name = unique_database + ".handling_rpc_response_with_max_iters"
+ self.execute_query_expect_success(
+ self.client, "create table {}(id int)".format(tbl_name))
# Make the catalog object version grow large enough
self.execute_query_expect_success(self.client, "invalidate metadata")
@@ -268,7 +271,7 @@ class TestRestart(CustomClusterTestSuite):
DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
.format(debug_action_sleep_time_sec * 1000))
- query = "alter table join_aa add columns (age" + " int)"
+ query = "alter table {} add columns (age int)".format(tbl_name)
handle = self.execute_query_async(query, query_options={"debug_action":
DEBUG_ACTION})
# Wait a bit so the RPC from the catalogd arrives to the coordinator.
@@ -283,7 +286,7 @@ class TestRestart(CustomClusterTestSuite):
# Issue DML queries so that the coordinator receives catalog updates.
for i in range(self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS):
try:
- query = "alter table join_aa add columns (age" + str(i) + " int)"
+ query = "alter table {} add columns (age{} int)".format(tbl_name, i)
self.execute_query_async(query)
time.sleep(self.STATESTORE_UPDATE_FREQ_SEC)
except Exception as e:
@@ -302,19 +305,19 @@ class TestRestart(CustomClusterTestSuite):
self.assert_impalad_log_contains("WARNING",
"Ignoring catalog update result of catalog service ID")
- self.execute_query_expect_success(self.client, "select age from join_aa")
+ self.execute_query_expect_success(self.client, "select age from
{}".format(tbl_name))
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)")
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
statestored_args="--statestore_update_frequency_ms=5000")
- def test_restart_catalogd_sync_ddl(self):
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
- self.execute_query_expect_success(self.client, "create table join_aa(id
int)")
+ def test_restart_catalogd_sync_ddl(self, unique_database):
+ tbl_name = unique_database + ".join_aa"
+ self.execute_query_expect_success(
+ self.client, "create table {}(id int)".format(tbl_name))
# Make the catalog object version grow large enough
self.execute_query_expect_success(self.client, "invalidate metadata")
query_options = {"sync_ddl": "true"}
@@ -323,7 +326,7 @@ class TestRestart(CustomClusterTestSuite):
# the local catalog catche of impalad out of sync
for i in range(0, 10):
try:
- query = "alter table join_aa add columns (age" + str(i) + " int)"
+ query = "alter table {} add columns (age{} int)".format(tbl_name, i)
self.execute_query_async(query, query_options)
except Exception as e:
LOG.info(str(e))
@@ -331,9 +334,8 @@ class TestRestart(CustomClusterTestSuite):
self.cluster.catalogd.restart()
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)", query_options)
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name),
query_options)
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
UPDATE_FREQUENCY_S = 10
@@ -341,10 +343,10 @@ class TestRestart(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--statestore_update_frequency_ms={frequency_ms}"
.format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
- def test_restart_catalogd_twice(self):
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
+ def test_restart_catalogd_twice(self, unique_database):
+ tbl_name = unique_database + ".join_aa"
self.cluster.catalogd.restart()
- query = "create table join_aa(id int)"
+ query = "create table {}(id int)".format(tbl_name)
query_handle = []
def execute_query_async():
@@ -356,18 +358,18 @@ class TestRestart(CustomClusterTestSuite):
self.cluster.catalogd.restart()
thread.join()
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)")
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal",
statestored_args="--statestore_update_frequency_ms=5000")
- def test_restart_catalogd_with_local_catalog(self):
- self.execute_query_expect_success(self.client, "drop table if exists
join_aa")
- self.execute_query_expect_success(self.client, "create table join_aa(id
int)")
+ def test_restart_catalogd_with_local_catalog(self, unique_database):
+ tbl_name = unique_database + ".join_aa"
+ self.execute_query_expect_success(
+ self.client, "create table {}(id int)".format(tbl_name))
# Make the catalog object version grow large enough
self.execute_query_expect_success(self.client, "invalidate metadata")
@@ -375,7 +377,7 @@ class TestRestart(CustomClusterTestSuite):
# the local catalog catche of impalad out of sync
for i in range(0, 10):
try:
- query = "alter table join_aa add columns (age" + str(i) + " int)"
+ query = "alter table {} add columns (age{} int)".format(tbl_name, i)
self.execute_query_async(query)
except Exception as e:
LOG.info(str(e))
@@ -383,10 +385,9 @@ class TestRestart(CustomClusterTestSuite):
self.cluster.catalogd.restart()
self.execute_query_expect_success(self.client,
- "alter table join_aa add columns (name string)")
- self.execute_query_expect_success(self.client, "select name from join_aa")
- self.execute_query_expect_success(self.client, "select age0 from join_aa")
- self.execute_query_expect_success(self.client, "drop table join_aa")
+ "alter table {} add columns (name string)".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select name from
{}".format(tbl_name))
+ self.execute_query_expect_success(self.client, "select age0 from
{}".format(tbl_name))
SUBSCRIBER_TIMEOUT_S = 2
CANCELLATION_GRACE_PERIOD_S = 5