This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6642b75ef IMPALA-13402: Clean up test_tuple_cache dimensions
6642b75ef is described below
commit 6642b75efc3a2535d31e25ec6de8e4673b4ee7c1
Author: Michael Smith <[email protected]>
AuthorDate: Wed Sep 25 15:24:45 2024 -0700
IMPALA-13402: Clean up test_tuple_cache dimensions
Uses exec_option_dimension to specify exec options, and avoids starting
a cluster when the test would just be skipped.
Uses other standard helpers to replace custom methods that were less
flexible.
Change-Id: Ib241f1f1cfaf918dffaddd5aeef3884c70e0a3fb
Reviewed-on: http://gerrit.cloudera.org:8080/21859
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
tests/custom_cluster/test_tuple_cache.py | 154 +++++++++++++++----------------
1 file changed, 77 insertions(+), 77 deletions(-)
diff --git a/tests/custom_cluster/test_tuple_cache.py
b/tests/custom_cluster/test_tuple_cache.py
index df5a3aea3..af30201f7 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -22,7 +22,8 @@ import random
import string
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_dimensions import (
+ add_exec_option_dimension, add_mandatory_exec_option)
TABLE_LAYOUT = 'name STRING, age INT, address STRING'
CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
@@ -53,8 +54,9 @@ def get_cache_keys(profile):
return cache_keys
-def assert_deterministic_scan(profile):
- assert "deterministic scan range assignment: true" in profile
+def assert_deterministic_scan(vector, profile):
+ if vector.get_value('exec_option')['mt_dop'] > 0:
+ assert "deterministic scan range assignment: true" in profile
class TestTupleCacheBase(CustomClusterTestSuite):
@@ -62,26 +64,19 @@ class TestTupleCacheBase(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
- def cached_query(self, query, mt_dop=1):
- return self.execute_query(query,
- {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": str(mt_dop)})
-
- def cached_query_w_debugaction(self, query, debugaction):
- query_opts = {
- "ENABLE_TUPLE_CACHE": "TRUE",
- "MT_DOP": "1",
- "DEBUG_ACTION": debugaction
- }
- return self.execute_query(query, query_opts)
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestTupleCacheBase, cls).add_test_dimensions()
+ add_mandatory_exec_option(cls, 'enable_tuple_cache', 'true')
# Generates a table containing at least <scale> KB of data.
def create_table(self, fq_table, scale=1):
- self.cached_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
+ self.execute_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
# To make the rows distinct, we keep using a different seed for table_value
global_index = 0
for _ in range(scale):
values = [table_value(i) for i in range(global_index, global_index + 70)]
- self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, "), (".join(values)))
global_index += 70
@@ -91,14 +86,19 @@ class TestTupleCacheBase(CustomClusterTestSuite):
class TestTupleCache(TestTupleCacheBase):
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestTupleCache, cls).add_test_dimensions()
+ add_mandatory_exec_option(cls, 'mt_dop', 1)
@CustomClusterTestSuite.with_args(cluster_size=1)
@pytest.mark.execute_serially
def test_cache_disabled(self, vector, unique_database):
+ self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.cache_disabled".format(unique_database)
self.create_table(fq_table)
- result1 = self.cached_query("SELECT * from {0}".format(fq_table))
- result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+ result1 = self.execute_query("SELECT * from {0}".format(fq_table))
+ result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
@@ -110,10 +110,11 @@ class TestTupleCache(TestTupleCacheBase):
start_args=CACHE_START_ARGS, cluster_size=1)
@pytest.mark.execute_serially
def test_create_and_select(self, vector, unique_database):
+ self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.create_and_select".format(unique_database)
self.create_table(fq_table)
- result1 = self.cached_query("SELECT * from {0}".format(fq_table))
- result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+ result1 = self.execute_query("SELECT * from {0}".format(fq_table))
+ result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
@@ -125,12 +126,13 @@ class TestTupleCache(TestTupleCacheBase):
start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB",
cluster_size=1,
impalad_args="--cache_force_single_shard")
@pytest.mark.execute_serially
- def test_cache_halted_select(self, vector, unique_database):
+ def test_cache_halted_select(self, vector):
# The cache is set to the minimum cache size, so run a SQL that produces
enough
# data to exceed the cache size and halt caching.
+ self.client.set_configuration(vector.get_value('exec_option'))
big_enough_query = "SELECT o_comment from tpch.orders"
- result1 = self.cached_query(big_enough_query)
- result2 = self.cached_query(big_enough_query)
+ result1 = self.execute_query(big_enough_query)
+ result2 = self.execute_query(big_enough_query)
assert result1.success
assert result2.success
@@ -142,37 +144,42 @@ class TestTupleCache(TestTupleCacheBase):
start_args=CACHE_START_ARGS, cluster_size=1)
@pytest.mark.execute_serially
def test_failpoints(self, vector, unique_database):
- fq_table = "{0}.create_and_select".format(unique_database)
+ fq_table = "{0}.failpoints".format(unique_database)
# Scale 20 gets us enough rows to force multiple RowBatches (needed for the
# the reader GetNext() cases).
self.create_table(fq_table, scale=20)
query = "SELECT * from {0}".format(fq_table)
+ def execute_debug(query, action):
+ exec_options = dict(vector.get_value('exec_option'))
+ exec_options['debug_action'] = action
+ return self.execute_query(query, exec_options)
+
# Fail when writing cache entry. All of these are handled and will not
fail the
# query.
# Case 1: fail during Open()
- result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_OPEN:[email protected]")
+ result = execute_debug(query, "TUPLE_FILE_WRITER_OPEN:[email protected]")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
# Case 2: fail during Write()
- result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_WRITE:[email protected]")
+ result = execute_debug(query, "TUPLE_FILE_WRITER_WRITE:[email protected]")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
# Case 3: fail during Commit()
- result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_COMMIT:[email protected]")
+ result = execute_debug(query, "TUPLE_FILE_WRITER_COMMIT:[email protected]")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
# Now, successfully add a cache entry
- result1 = self.cached_query(query)
+ result1 = self.execute_query(query, vector.get_value('exec_option'))
assert result1.success
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
# Fail when reading a cache entry
# Case 1: fail during Open()
- result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_READER_OPEN:[email protected]")
+ result = execute_debug(query, "TUPLE_FILE_READER_OPEN:[email protected]")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
@@ -180,8 +187,7 @@ class TestTupleCache(TestTupleCacheBase):
assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
# Case 2: fail during the first GetNext() call
- result = self.cached_query_w_debugaction(query,
- "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
+ result = execute_debug(query, "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
@@ -193,20 +199,20 @@ class TestTupleCache(TestTupleCacheBase):
# has already returned cached rows
hit_error = False
try:
- result = self.cached_query_w_debugaction(query,
- "TUPLE_FILE_READER_SECOND_GETNEXT:[email protected]")
+ result = execute_debug(query,
"TUPLE_FILE_READER_SECOND_GETNEXT:[email protected]")
except Exception:
hit_error = True
assert hit_error
-class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
+class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
+ """Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
- super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
- cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *[0, 1,
2]))
+ super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions()
+ add_exec_option_dimension(cls, 'mt_dop', [0, 1])
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1)
@@ -214,14 +220,9 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
def test_scan_range_basics(self, vector, unique_database):
"""
This tests that adding/removing files to a table results in different keys.
- This runs on a single node with mt_dop=0 or mt_dop=1, so it is the simplest
- test.
"""
- mt_dop = vector.get_value('mt_dop')
- # To keep this simple, we skip mt_dop > 1.
- if mt_dop > 1:
- pytest.skip()
- fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+ self.client.set_configuration(vector.get_value('exec_option'))
+ fq_table = "{0}.scan_range_basics".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
# Create an empty table
@@ -230,21 +231,20 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
# When there are no scan ranges, then fragment instance key is 0. This is
# somewhat of a toy case and we probably want to avoid caching in this
# case. Nonetheless, it is a good sanity check.
- empty_result = self.cached_query(query, mt_dop=mt_dop)
+ empty_result = self.execute_query(query)
cache_keys = get_cache_keys(empty_result.runtime_profile)
assert len(cache_keys) == 1
empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_")
assert empty_table_finst_key == "0"
assert len(empty_result.data) == 0
- if mt_dop > 0:
- assert_deterministic_scan(empty_result.runtime_profile)
+ assert_deterministic_scan(vector, empty_result.runtime_profile)
# Insert a row, which creates a file / scan range
- self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(0)))
# Now, there is a scan range, so the fragment instance key should be
non-zero.
- one_file_result = self.cached_query(query, mt_dop=mt_dop)
+ one_file_result = self.execute_query(query)
cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(cache_keys) == 1
one_file_compile_key, one_file_finst_key = cache_keys[0].split("_")
@@ -252,18 +252,17 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
# This should be a cache miss
assertCounters(one_file_result.runtime_profile, 0, 0, 0)
assert len(one_file_result.data) == 1
- if mt_dop > 0:
- assert_deterministic_scan(one_file_result.runtime_profile)
+ assert_deterministic_scan(vector, one_file_result.runtime_profile)
# The new scan range did not change the compile-time key
assert empty_table_compile_key == one_file_compile_key
# Insert another row, which creates a file / scan range
- self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1)))
# There is a second scan range, so the fragment instance key should change
again
- two_files_result = self.cached_query(query, mt_dop=mt_dop)
+ two_files_result = self.execute_query(query)
cache_keys = get_cache_keys(two_files_result.runtime_profile)
assert len(cache_keys) == 1
two_files_compile_key, two_files_finst_key = cache_keys[0].split("_")
@@ -273,15 +272,14 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
assert one_file_finst_key != two_files_finst_key
overlapping_rows =
set(one_file_result.data).intersection(set(two_files_result.data))
assert len(overlapping_rows) == 1
- if mt_dop > 0:
- assert_deterministic_scan(two_files_result.runtime_profile)
+ assert_deterministic_scan(vector, two_files_result.runtime_profile)
# The new scan range did not change the compile-time key
assert one_file_compile_key == two_files_compile_key
# Invalidate metadata and rerun the last query. The keys should stay the
same.
- self.cached_query("invalidate metadata")
- rerun_two_files_result = self.cached_query(query, mt_dop=mt_dop)
+ self.execute_query("invalidate metadata")
+ rerun_two_files_result = self.execute_query(query)
# Verify that this is a cache hit
assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0)
cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
@@ -294,24 +292,20 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1)
@pytest.mark.execute_serially
- def test_scan_range_partitioned(self, vector, unique_database):
+ def test_scan_range_partitioned(self, vector):
"""
This tests a basic partitioned case where the query is identical except
that
- it operates on different partitions (and thus different scan ranges)
- This runs on a single node with mt_dop=0 or mt_dop=1 to keep it simple.
+ it operates on different partitions (and thus different scan ranges).
"""
- mt_dop = vector.get_value('mt_dop')
- # To keep this simple, we skip mt_dop > 1.
- if mt_dop > 1:
- pytest.skip()
- year2009_result = self.cached_query(
- "select * from functional.alltypes where year=2009", mt_dop=mt_dop)
+ self.client.set_configuration(vector.get_value('exec_option'))
+ year2009_result = self.execute_query(
+ "select * from functional.alltypes where year=2009")
cache_keys = get_cache_keys(year2009_result.runtime_profile)
assert len(cache_keys) == 1
year2009_compile_key, year2009_finst_key = cache_keys[0].split("_")
- year2010_result = self.cached_query(
- "select * from functional.alltypes where year=2010", mt_dop=mt_dop)
+ year2010_result = self.execute_query(
+ "select * from functional.alltypes where year=2010")
cache_keys = get_cache_keys(year2010_result.runtime_profile)
assert len(cache_keys) == 1
year2010_compile_key, year2010_finst_key = cache_keys[0].split("_")
@@ -334,6 +328,14 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
assert year2010_result.data[0].find("2010") != -1
assert year2010_result.data[0].find("2009") == -1
+
+class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
+ add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2])
+
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
@pytest.mark.execute_serially
def test_scan_range_distributed(self, vector, unique_database):
@@ -343,9 +345,9 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
distinct cache key. When adding a scan range, at least one fragment
instance
cache key should change.
"""
-
- mt_dop = vector.get_value('mt_dop')
- fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+ self.client.set_configuration(vector.get_value('exec_option'))
+ mt_dop = vector.get_value('exec_option')['mt_dop']
+ fq_table = "{0}.scan_range_distributed".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
# Create a table with several files so that we always have enough work for
multiple
@@ -354,7 +356,7 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
# We run a simple select. This is running with multiple impalads, so there
are
# always multiple fragment instances
- before_result = self.cached_query(query, mt_dop=mt_dop)
+ before_result = self.execute_query(query)
cache_keys = get_cache_keys(before_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
@@ -370,17 +372,16 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
for impalad in self.cluster.impalads:
entries_in_use = self.get_tuple_cache_metric(impalad.service,
"entries-in-use")
assert entries_in_use == max(mt_dop, 1)
- if mt_dop > 0:
- assert_deterministic_scan(before_result.runtime_profile)
+ assert_deterministic_scan(vector, before_result.runtime_profile)
# Insert another row, which creates a file / scan range
# This uses a very large seed for table_value() to get a unique row that
isn't
# already in the table.
- self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1000000)))
# Rerun the query with the extra scan range
- after_insert_result = self.cached_query(query, mt_dop=mt_dop)
+ after_insert_result = self.execute_query(query)
cache_keys = get_cache_keys(after_insert_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
@@ -409,8 +410,7 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
assert entries_in_use <= (2 * max(mt_dop, 1))
total_entries_in_use += entries_in_use
assert total_entries_in_use >= len(all_cache_keys)
- if mt_dop > 0:
- assert_deterministic_scan(after_insert_result.runtime_profile)
+ assert_deterministic_scan(vector, after_insert_result.runtime_profile)
# The extra scan range means that at least one fragment instance key
changed
# Since scheduling can change completely with the addition of a single
scan range,