This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2085edbe1 IMPALA-13503: CustomClusterTestSuite for whole class
2085edbe1 is described below

commit 2085edbe1cdeb6e01b14b83113b5bc5b9e70f86f
Author: Michael Smith <[email protected]>
AuthorDate: Wed Oct 30 16:47:44 2024 -0700

    IMPALA-13503: CustomClusterTestSuite for whole class
    
    Allow using CustomClusterTestSuite with a single cluster for the whole
    class. This speeds up tests by letting us group together multiple test
    cases on the same cluster configuration and only starting the cluster
    once.
    
    Updates tuple cache tests as an example of how this can be used. Reduces
    test_tuple_cache execution time from 100s to 60s.
    
    Change-Id: I7a08694edcf8cc340d89a0fb33beb8229163b356
    Reviewed-on: http://gerrit.cloudera.org:8080/22006
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Jason Fehr <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/custom_cluster_test_suite.py | 266 +++++++++++++++------------
 tests/custom_cluster/test_tuple_cache.py  | 289 +++++++++++++++---------------
 2 files changed, 291 insertions(+), 264 deletions(-)

diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index c3ebb19c2..aa9f0d4e4 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -19,6 +19,7 @@
 # TODO: Configure cluster size and other parameters.
 
 from __future__ import absolute_import, division, print_function
+import inspect
 import logging
 import os
 import os.path
@@ -41,6 +42,8 @@ LOG = logging.getLogger(__name__)
 IMPALA_HOME = os.environ['IMPALA_HOME']
 DEFAULT_CLUSTER_SIZE = 3
 NUM_COORDINATORS = DEFAULT_CLUSTER_SIZE
+# Marks that with_args was specified on a method, as opposed to the whole 
class.
+WITH_ARGS_METHOD = 'with_args_method'
 
 # Additional args passed to respective daemon command line.
 IMPALAD_ARGS = 'impalad_args'
@@ -87,14 +90,20 @@ DEFAULT_STATESTORE_ARGS = 
('--statestore_update_frequency_ms=50 '
 
 
 class CustomClusterTestSuite(ImpalaTestSuite):
-  """Every test in a test suite deriving from this class gets its own Impala 
cluster.
-  Custom arguments may be passed to the cluster by using the @with_args 
decorator."""
+  """Runs tests with a custom Impala cluster. There are two modes:
+  - If a @with_args decorator is specified on the class, all tests in the 
class share a
+    single Impala cluster. @with_args decorators on methods are ignored.
+  - Otherwise every test runs starts a new cluster, and @with_args decorators 
on a test
+    method can be used to customize that cluster."""
 
   # Central place to keep all temporary dirs referred by a custom cluster test 
method.
   # setup_method() will populate this using make_tmp_dir(), and then 
teardown_method()
   # will clean up using clear_tmp_dirs().
   TMP_DIRS = dict()
 
+  # Args for cluster startup/teardown when sharing a single cluster for the 
entire class.
+  SHARED_CLUSTER_ARGS = None
+
   @classmethod
   def get_workload(cls):
     return 'tpch'
@@ -119,22 +128,22 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @classmethod
   def setup_class(cls):
-    # Explicit override of ImpalaTestSuite.setup_class(). For custom cluster, 
the
-    # ImpalaTestSuite.setup_class() procedure needs to happen on a per-method 
basis.
     # IMPALA-3614: @SkipIfLocal.multiple_impalad workaround
     # IMPALA-2943 TODO: When pytest is upgraded, see if this explicit skip can 
be
     # removed in favor of the class-level SkipifLocal.multiple_impalad 
decorator.
     if IS_LOCAL:
       pytest.skip("multiple impalads needed")
 
+    if cls.SHARED_CLUSTER_ARGS:
+      cls.cluster_setup(cls.SHARED_CLUSTER_ARGS)
+
   @classmethod
   def teardown_class(cls):
-    # Explicit override of ImpalaTestSuite.teardown_class(). For custom 
cluster, the
-    # ImpalaTestSuite.teardown_class() procedure needs to happen on a 
per-method basis.
-    pass
+    if cls.SHARED_CLUSTER_ARGS:
+      cls.cluster_teardown(cls.__name__, cls.SHARED_CLUSTER_ARGS)
 
-  @staticmethod
-  def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
+  @classmethod
+  def with_args(cls, impalad_args=None, statestored_args=None, 
catalogd_args=None,
       start_args=None, default_query_options=None, jvm_args=None,
       impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
@@ -143,65 +152,76 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       expect_startup_fail=False, disable_log_buffering=False):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
     method's func_dict"""
-    def decorate(func):
-      if impalad_args is not None:
-        func.__dict__[IMPALAD_ARGS] = impalad_args
-      if statestored_args is not None:
-        func.__dict__[STATESTORED_ARGS] = statestored_args
-      if catalogd_args is not None:
-        func.__dict__[CATALOGD_ARGS] = catalogd_args
-      if start_args is not None:
-        func.__dict__[START_ARGS] = start_args
-      if jvm_args is not None:
-        func.__dict__[JVM_ARGS] = jvm_args
-      if hive_conf_dir is not None:
-        func.__dict__[HIVE_CONF_DIR] = hive_conf_dir
-      if kudu_args is not None:
-        func.__dict__[KUDU_ARGS] = kudu_args
-      if default_query_options is not None:
-        func.__dict__[DEFAULT_QUERY_OPTIONS] = default_query_options
-      if impala_log_dir is not None:
-        func.__dict__[IMPALA_LOG_DIR] = impala_log_dir
-      if cluster_size is not None:
-        func.__dict__[CLUSTER_SIZE] = cluster_size
-      if num_exclusive_coordinators is not None:
-        func.__dict__[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
-      if statestored_timeout_s is not None:
-        func.__dict__[STATESTORED_TIMEOUT_S] = statestored_timeout_s
-      if impalad_timeout_s is not None:
-        func.__dict__[IMPALAD_TIMEOUT_S] = impalad_timeout_s
-      if expect_cores is not None:
-        func.__dict__[EXPECT_CORES] = expect_cores
-      if reset_ranger:
-        func.__dict__[RESET_RANGER] = True
-      if impalad_graceful_shutdown:
-        func.__dict__[IMPALAD_GRACEFUL_SHUTDOWN] = True
-      if tmp_dir_placeholders:
-        func.__dict__[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders
-      if expect_startup_fail:
-        func.__dict__[EXPECT_STARTUP_FAIL] = True
-      if disable_log_buffering:
-        func.__dict__[DISABLE_LOG_BUFFERING] = True
-      return func
+    args = dict()
+    if impalad_args is not None:
+      args[IMPALAD_ARGS] = impalad_args
+    if statestored_args is not None:
+      args[STATESTORED_ARGS] = statestored_args
+    if catalogd_args is not None:
+      args[CATALOGD_ARGS] = catalogd_args
+    if start_args is not None:
+      args[START_ARGS] = start_args
+    if jvm_args is not None:
+      args[JVM_ARGS] = jvm_args
+    if hive_conf_dir is not None:
+      args[HIVE_CONF_DIR] = hive_conf_dir
+    if kudu_args is not None:
+      args[KUDU_ARGS] = kudu_args
+    if default_query_options is not None:
+      args[DEFAULT_QUERY_OPTIONS] = default_query_options
+    if impala_log_dir is not None:
+      args[IMPALA_LOG_DIR] = impala_log_dir
+    if cluster_size is not None:
+      args[CLUSTER_SIZE] = cluster_size
+    if num_exclusive_coordinators is not None:
+      args[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
+    if statestored_timeout_s is not None:
+      args[STATESTORED_TIMEOUT_S] = statestored_timeout_s
+    if impalad_timeout_s is not None:
+      args[IMPALAD_TIMEOUT_S] = impalad_timeout_s
+    if expect_cores is not None:
+      args[EXPECT_CORES] = expect_cores
+    if reset_ranger:
+      args[RESET_RANGER] = True
+    if impalad_graceful_shutdown:
+      args[IMPALAD_GRACEFUL_SHUTDOWN] = True
+    if tmp_dir_placeholders:
+      args[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders
+    if expect_startup_fail:
+      args[EXPECT_STARTUP_FAIL] = True
+    if disable_log_buffering:
+      args[DISABLE_LOG_BUFFERING] = True
+
+    def decorate(obj):
+      """If obj is a class, set SHARED_CLUSTER_ARGS for setup/teardown_class. 
Otherwise
+      add to the function __dict__ for setup/teardown_method."""
+      if inspect.isclass(obj):
+        obj.SHARED_CLUSTER_ARGS = args
+      else:
+        obj.__dict__[WITH_ARGS_METHOD] = True
+        obj.__dict__.update(args)
+      return obj
     return decorate
 
-  def make_tmp_dir(self, name):
+  @classmethod
+  def make_tmp_dir(cls, name):
     """Create a temporary directory and register it."""
-    assert name not in self.TMP_DIRS
-    self.TMP_DIRS[name] = make_tmp_test_dir(name)
-    LOG.info("Created temporary dir {}".format(self.TMP_DIRS[name]))
-    return self.TMP_DIRS[name]
+    assert name not in cls.TMP_DIRS
+    cls.TMP_DIRS[name] = make_tmp_test_dir(name)
+    LOG.info("Created temporary dir {}".format(cls.TMP_DIRS[name]))
+    return cls.TMP_DIRS[name]
 
   def get_tmp_dir(self, name):
     """Get the path of temporary directory that was registered with given 
'name'."""
     return self.TMP_DIRS[name]
 
-  def clear_tmp_dirs(self):
+  @classmethod
+  def clear_tmp_dirs(cls):
     """Clear all temporary dirs."""
-    for tmp_dir in self.TMP_DIRS.values():
+    for tmp_dir in cls.TMP_DIRS.values():
       LOG.info("Removing temporary dir {}".format(tmp_dir))
       cleanup_tmp_test_dir(tmp_dir)
-    self.TMP_DIRS.clear()
+    cls.TMP_DIRS.clear()
 
   def clear_tmp_dir(self, name):
     """Clear temporary dir 'name'."""
@@ -210,17 +230,18 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     cleanup_tmp_test_dir(self.TMP_DIRS[name])
     del self.TMP_DIRS[name]
 
-  def setup_method(self, method):
+  @classmethod
+  def cluster_setup(cls, args):
     cluster_args = list()
-    disable_log_buffering = method.__dict__.get(DISABLE_LOG_BUFFERING, False)
-    self._warn_assert_log = not disable_log_buffering
+    disable_log_buffering = args.get(DISABLE_LOG_BUFFERING, False)
+    cls._warn_assert_log = not disable_log_buffering
 
-    if TMP_DIR_PLACEHOLDERS in method.__dict__:
+    if TMP_DIR_PLACEHOLDERS in args:
       # Create all requested temporary dirs.
-      for name in method.__dict__[TMP_DIR_PLACEHOLDERS]:
-        self.make_tmp_dir(name)
+      for name in args[TMP_DIR_PLACEHOLDERS]:
+        cls.make_tmp_dir(name)
 
-    if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
+    if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
       # IMPALA-13051: Add faster default graceful shutdown options before 
processing
       # explicit args. Impala doesn't start graceful shutdown until the grace 
period has
       # passed, and most tests that use graceful shutdown are testing flushing 
the query
@@ -232,36 +253,36 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       val = ''
       if arg in impala_daemons and disable_log_buffering:
         val += '--logbuflevel=-1 '
-      if arg in method.__dict__:
-        val += (method.__dict__[arg] if arg not in ACCEPT_FORMATTING
-               else method.__dict__[arg].format(**self.TMP_DIRS))
+      if arg in args:
+        val += (args[arg] if arg not in ACCEPT_FORMATTING
+               else args[arg].format(**cls.TMP_DIRS))
       if val:
         cluster_args.append("--%s=%s " % (arg, val))
-    if START_ARGS in method.__dict__:
-      cluster_args.extend(method.__dict__[START_ARGS].split())
+    if START_ARGS in args:
+      cluster_args.extend(args[START_ARGS].split())
 
-    if HIVE_CONF_DIR in method.__dict__:
-      self._start_hive_service(method.__dict__[HIVE_CONF_DIR])
+    if HIVE_CONF_DIR in args:
+      cls._start_hive_service(args[HIVE_CONF_DIR])
       # Should let Impala adopt the same hive-site.xml. The only way is to add 
it in the
       # beginning of the CLASSPATH. Because there's already a hive-site.xml in 
the
       # default CLASSPATH (see bin/set-classpath.sh).
       cluster_args.append(
-        '--env_vars=CUSTOM_CLASSPATH=%s ' % method.__dict__[HIVE_CONF_DIR])
+        '--env_vars=CUSTOM_CLASSPATH=%s ' % args[HIVE_CONF_DIR])
 
-    if KUDU_ARGS in method.__dict__:
-      self._restart_kudu_service(method.__dict__[KUDU_ARGS])
+    if KUDU_ARGS in args:
+      cls._restart_kudu_service(args[KUDU_ARGS])
 
-    if RESET_RANGER in method.__dict__:
-      self._reset_ranger_policy_repository()
+    if RESET_RANGER in args:
+      cls._reset_ranger_policy_repository()
 
     cluster_size = DEFAULT_CLUSTER_SIZE
-    if CLUSTER_SIZE in method.__dict__:
-      cluster_size = method.__dict__[CLUSTER_SIZE]
+    if CLUSTER_SIZE in args:
+      cluster_size = args[CLUSTER_SIZE]
 
     use_exclusive_coordinators = False
     num_coordinators = cluster_size
-    if NUM_EXCLUSIVE_COORDINATORS in method.__dict__:
-      num_coordinators = method.__dict__[NUM_EXCLUSIVE_COORDINATORS]
+    if NUM_EXCLUSIVE_COORDINATORS in args:
+      num_coordinators = args[NUM_EXCLUSIVE_COORDINATORS]
       use_exclusive_coordinators = True
 
     # Start a clean new cluster before each test
@@ -269,75 +290,86 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       "cluster_size": cluster_size,
       "num_coordinators": num_coordinators,
       "expected_num_impalads": cluster_size,
-      DEFAULT_QUERY_OPTIONS: method.__dict__.get(DEFAULT_QUERY_OPTIONS),
+      DEFAULT_QUERY_OPTIONS: args.get(DEFAULT_QUERY_OPTIONS),
       "use_exclusive_coordinators": use_exclusive_coordinators
     }
-    if IMPALA_LOG_DIR in method.__dict__:
-      kwargs[IMPALA_LOG_DIR] = 
method.__dict__[IMPALA_LOG_DIR].format(**self.TMP_DIRS)
-    if STATESTORED_TIMEOUT_S in method.__dict__:
-      kwargs[STATESTORED_TIMEOUT_S] = method.__dict__[STATESTORED_TIMEOUT_S]
-    if IMPALAD_TIMEOUT_S in method.__dict__:
-      kwargs[IMPALAD_TIMEOUT_S] = method.__dict__[IMPALAD_TIMEOUT_S]
-
-    if method.__dict__.get(EXPECT_CORES, False):
+    if IMPALA_LOG_DIR in args:
+      kwargs[IMPALA_LOG_DIR] = args[IMPALA_LOG_DIR].format(**cls.TMP_DIRS)
+    if STATESTORED_TIMEOUT_S in args:
+      kwargs[STATESTORED_TIMEOUT_S] = args[STATESTORED_TIMEOUT_S]
+    if IMPALAD_TIMEOUT_S in args:
+      kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S]
+
+    if args.get(EXPECT_CORES, False):
       # Make a note of any core files that already exist
       possible_cores = find_all_files('*core*')
-      self.pre_test_cores = set([f for f in possible_cores if is_core_dump(f)])
+      cls.pre_test_cores = set([f for f in possible_cores if is_core_dump(f)])
 
       # Explicitly allow startup to exception, since startup is expected to 
fail
       try:
-        self._start_impala_cluster(cluster_args, **kwargs)
+        cls._start_impala_cluster(cluster_args, **kwargs)
         pytest.fail("cluster startup should have failed")
       except Exception:
-        self._stop_impala_cluster()
+        cls._stop_impala_cluster()
     else:
       try:
-        self._start_impala_cluster(cluster_args, **kwargs)
+        cls._start_impala_cluster(cluster_args, **kwargs)
 
         # Fail test if cluster startup succeeded when it was supposed to fail.
-        assert not method.__dict__.get(EXPECT_STARTUP_FAIL, False), \
+        assert not args.get(EXPECT_STARTUP_FAIL, False), \
             "Expected cluster startup to fail, but startup succeeded."
 
-        super(CustomClusterTestSuite, self).setup_class()
+        super(CustomClusterTestSuite, cls).setup_class()
       except AssertionError as e:
-        if method.__dict__.get(EXPECT_STARTUP_FAIL, False):
+        if args.get(EXPECT_STARTUP_FAIL, False):
           assert e.msg == "num_known_live_backends did not reach expected 
value " \
               "in time", "Unexpected exception: {}".format(e)
         else:
           raise e
       except subprocess.CalledProcessError as e:
-        if method.__dict__.get(EXPECT_STARTUP_FAIL, False):
+        if args.get(EXPECT_STARTUP_FAIL, False):
           assert search(r"returned non-zero exit status", str(e)), \
               "Unexpected exception: {}".format(e)
         else:
           raise e
 
-  def teardown_method(self, method):
-    if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
-      for impalad in self.cluster.impalads:
+  def setup_method(self, method):
+    if not self.SHARED_CLUSTER_ARGS:
+      self.cluster_setup(method.__dict__)
+    elif method.__dict__.get(WITH_ARGS_METHOD):
+      pytest.fail("Cannot specify with_args on both class and methods")
+
+  @classmethod
+  def cluster_teardown(cls, name, args):
+    if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
+      for impalad in cls.cluster.impalads:
         impalad.kill(SIGRTMIN)
-      for impalad in self.cluster.impalads:
+      for impalad in cls.cluster.impalads:
         impalad.wait_for_exit()
 
-    self.clear_tmp_dirs()
+    cls.clear_tmp_dirs()
 
-    if HIVE_CONF_DIR in method.__dict__:
-      self._start_hive_service(None)  # Restart Hive Service using default 
configs
+    if HIVE_CONF_DIR in args:
+      cls._start_hive_service(None)  # Restart Hive Service using default 
configs
 
-    if method.__dict__.get(EXPECT_CORES, False):
+    if args.get(EXPECT_CORES, False):
       # The core dumps expected to be generated by this test should be cleaned 
up
       possible_cores = find_all_files('*core*')
       post_test_cores = set([f for f in possible_cores if is_core_dump(f)])
 
-      for f in (post_test_cores - self.pre_test_cores):
+      for f in (post_test_cores - cls.pre_test_cores):
         LOG.info("Cleaned up {core} created by {name}".format(
-          core=f, name=method.__name__))
+          core=f, name=name))
         os.remove(f)
       # Skip teardown_class as setup was skipped.
-    elif not method.__dict__.get(EXPECT_STARTUP_FAIL, False):
+    elif not args.get(EXPECT_STARTUP_FAIL, False):
       # Skip teardown (which closes all open clients) if a startup failure is 
expected
       # since no clients will have been created.
-      super(CustomClusterTestSuite, self).teardown_class()
+      super(CustomClusterTestSuite, cls).teardown_class()
+
+  def teardown_method(self, method):
+    if not self.SHARED_CLUSTER_ARGS:
+      self.cluster_teardown(method.__name__, method.__dict__)
 
   def wait_for_wm_init_complete(self, timeout_s=120):
     """Waits for the catalog to report the workload management initialization 
process
@@ -353,8 +385,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     check_call([os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'), 
'--kill_only'])
     sleep(2)
 
-  @classmethod
-  def _restart_kudu_service(cls, kudu_args=None):
+  @staticmethod
+  def _restart_kudu_service(kudu_args=None):
     kudu_env = dict(os.environ)
     if kudu_args is not None:
       kudu_env["IMPALA_KUDU_STARTUP_FLAGS"] = kudu_args
@@ -366,8 +398,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if call.returncode != 0:
       raise RuntimeError("Unable to restart Kudu")
 
-  @classmethod
-  def _start_hive_service(cls, hive_conf_dir):
+  @staticmethod
+  def _start_hive_service(hive_conf_dir):
     hive_env = dict(os.environ)
     if hive_conf_dir is not None:
       hive_env['HIVE_CONF_DIR'] = hive_conf_dir
@@ -378,14 +410,14 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if call.returncode != 0:
       raise RuntimeError("Unable to start Hive")
 
-  @classmethod
-  def _stop_hive_service(cls):
+  @staticmethod
+  def _stop_hive_service():
     subprocess.check_call([os.path.join(IMPALA_HOME,
                                         "testdata/bin/kill-hive-server.sh")],
                           close_fds=True)
 
-  @classmethod
-  def _reset_ranger_policy_repository(cls):
+  @staticmethod
+  def _reset_ranger_policy_repository():
     script_kill_ranger = os.path.join(os.environ['IMPALA_HOME'],
                                       'testdata/bin/kill-ranger-server.sh')
     script_run_ranger = os.path.join(os.environ['IMPALA_HOME'],
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index eccbe5d1a..76ad6794a 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -140,14 +140,15 @@ class TestTupleCacheBase(CustomClusterTestSuite):
     return impalaservice.get_metric_value('impala.tuple-cache.' + suffix)
 
 
-class TestTupleCache(TestTupleCacheBase):
+class TestTupleCacheOptions(TestTupleCacheBase):
+  """Tests Impala with different tuple cache startup options."""
+
   @classmethod
   def add_test_dimensions(cls):
-    super(TestTupleCache, cls).add_test_dimensions()
+    super(TestTupleCacheOptions, cls).add_test_dimensions()
     add_mandatory_exec_option(cls, 'mt_dop', 1)
 
   @CustomClusterTestSuite.with_args(cluster_size=1)
-  @pytest.mark.execute_serially
   def test_cache_disabled(self, vector, unique_database):
     self.client.set_configuration(vector.get_value('exec_option'))
     fq_table = "{0}.cache_disabled".format(unique_database)
@@ -161,31 +162,9 @@ class TestTupleCache(TestTupleCacheBase):
     assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
     assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
 
-  @CustomClusterTestSuite.with_args(
-      start_args=CACHE_START_ARGS, cluster_size=1)
-  @pytest.mark.execute_serially
-  def test_create_and_select(self, vector, unique_database):
-    self.client.set_configuration(vector.get_value('exec_option'))
-    fq_table = "{0}.create_and_select".format(unique_database)
-    self.create_table(fq_table)
-    result1 = self.execute_query("SELECT * from {0}".format(fq_table))
-    result2 = self.execute_query("SELECT * from {0}".format(fq_table))
-
-    assert result1.success
-    assert result2.success
-    assert result1.data == result2.data
-    assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
-    assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-    # Verify that the bytes written by the first profile are the same as the 
bytes
-    # read by the second profile.
-    bytes_written = getCounterValues(result1.runtime_profile, 
"TupleCacheBytesWritten")
-    bytes_read = getCounterValues(result2.runtime_profile, 
"TupleCacheBytesRead")
-    assert sorted(bytes_written) == sorted(bytes_read)
-
   @CustomClusterTestSuite.with_args(
       start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", 
cluster_size=1,
       impalad_args="--cache_force_single_shard")
-  @pytest.mark.execute_serially
   def test_cache_halted_select(self, vector):
     # The cache is set to the minimum cache size, so run a SQL that produces 
enough
     # data to exceed the cache size and halt caching.
@@ -207,7 +186,6 @@ class TestTupleCache(TestTupleCacheBase):
   @CustomClusterTestSuite.with_args(
     start_args=CACHE_START_ARGS, cluster_size=1,
     impalad_args="--tuple_cache_ignore_query_options=true")
-  @pytest.mark.execute_serially
   def test_failpoints(self, vector, unique_database):
     fq_table = "{0}.failpoints".format(unique_database)
     # Scale 20 gets us enough rows to force multiple RowBatches (needed for the
@@ -270,8 +248,123 @@ class TestTupleCache(TestTupleCacheBase):
 
     assert hit_error
 
-  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
-  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      start_args=CACHE_START_ARGS, cluster_size=1,
+      
impalad_args='--tuple_cache_exempt_query_options=max_errors,exec_time_limit_s')
+  def test_custom_exempt_query_options(self, vector, unique_database):
+    """Custom list of exempt query options share cache entry"""
+    fq_table = "{0}.query_options".format(unique_database)
+    self.create_table(fq_table)
+    query = "SELECT * from {0}".format(fq_table)
+
+    errors_10 = dict(vector.get_value('exec_option'))
+    errors_10['max_errors'] = '10'
+    exec_time_limit = dict(vector.get_value('exec_option'))
+    exec_time_limit['exec_time_limit_s'] = '30'
+
+    exempt1 = self.execute_query(query, query_options=errors_10)
+    exempt2 = self.execute_query(query, query_options=exec_time_limit)
+    exempt3 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
+    assert exempt1.success
+    assert exempt2.success
+    assert exempt1.data == exempt2.data
+    assert exempt1.data == exempt3.data
+    assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
+    assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+
+
[email protected]_args(start_args=CACHE_START_ARGS, cluster_size=1)
+class TestTupleCacheSingle(TestTupleCacheBase):
+  """Tests Impala with a single executor and mt_dop=1."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCacheSingle, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'mt_dop', 1)
+
+  def test_create_and_select(self, vector, unique_database):
+    self.client.set_configuration(vector.get_value('exec_option'))
+    fq_table = "{0}.create_and_select".format(unique_database)
+    self.create_table(fq_table)
+    result1 = self.execute_query("SELECT * from {0}".format(fq_table))
+    result2 = self.execute_query("SELECT * from {0}".format(fq_table))
+
+    assert result1.success
+    assert result2.success
+    assert result1.data == result2.data
+    assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
+    assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    # Verify that the bytes written by the first profile are the same as the 
bytes
+    # read by the second profile.
+    bytes_written = getCounterValues(result1.runtime_profile, 
"TupleCacheBytesWritten")
+    bytes_read = getCounterValues(result2.runtime_profile, 
"TupleCacheBytesRead")
+    assert sorted(bytes_written) == sorted(bytes_read)
+
+  def test_non_exempt_query_options(self, vector, unique_database):
+    """Non-exempt query options result in different cache entries"""
+    fq_table = "{0}.query_options".format(unique_database)
+    self.create_table(fq_table)
+    query = "SELECT * from {0}".format(fq_table)
+
+    strict_true = dict(vector.get_value('exec_option'))
+    strict_true['strict_mode'] = 'true'
+    strict_false = dict(vector.get_value('exec_option'))
+    strict_false['strict_mode'] = 'false'
+
+    noexempt1 = self.execute_query(query, query_options=strict_false)
+    noexempt2 = self.execute_query(query, query_options=strict_true)
+    noexempt3 = self.execute_query(query, query_options=strict_false)
+    noexempt4 = self.execute_query(query, query_options=strict_true)
+    noexempt5 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
+
+    assert noexempt1.success
+    assert noexempt2.success
+    assert noexempt3.success
+    assert noexempt4.success
+    assert noexempt5.success
+    assert noexempt1.data == noexempt2.data
+    assert noexempt1.data == noexempt3.data
+    assert noexempt1.data == noexempt4.data
+    assert noexempt1.data == noexempt5.data
+    assertCounters(noexempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
+    assertCounters(noexempt2.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
+    assertCounters(noexempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    assertCounters(noexempt4.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    assertCounters(noexempt5.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+
+  def test_exempt_query_options(self, vector, unique_database):
+    """Exempt query options share cache entry"""
+    fq_table = "{0}.query_options".format(unique_database)
+    self.create_table(fq_table)
+    query = "SELECT * from {0}".format(fq_table)
+
+    codegen_false = dict(vector.get_value('exec_option'))
+    codegen_false['disable_codegen'] = 'true'
+    codegen_true = dict(vector.get_value('exec_option'))
+    codegen_true['disable_codegen'] = 'false'
+
+    exempt1 = self.execute_query(query, query_options=codegen_true)
+    exempt2 = self.execute_query(query, query_options=codegen_false)
+    exempt3 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
+    assert exempt1.success
+    assert exempt2.success
+    assert exempt1.data == exempt2.data
+    assert exempt1.data == exempt3.data
+    assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
+    assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+
+
[email protected]_args(start_args=CACHE_START_ARGS)
+class TestTupleCacheCluster(TestTupleCacheBase):
+  """Tests Impala with 3 executors and mt_dop=1."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCacheCluster, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'mt_dop', 1)
+
   def test_runtime_filters(self, vector, unique_database):
     """
     This tests that adding files to a table results in different runtime 
filter keys.
@@ -366,8 +459,6 @@ class TestTupleCache(TestTupleCacheBase):
     assert rerun_cache_keys == two_cache_keys
     assert rerun_two_files_result.data == two_files_result.data
 
-  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
-  @pytest.mark.execute_serially
   def test_runtime_filter_reload(self, vector, unique_database):
     """
     This tests that reloading files to a table results in matching runtime 
filter keys.
@@ -406,91 +497,8 @@ class TestTupleCache(TestTupleCacheBase):
     assert base_cache_keys == reload_cache_keys
     # Skips verifying cache hits as fragments may not be assigned to the same 
nodes.
 
-  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, 
cluster_size=1)
-  @pytest.mark.execute_serially
-  def test_non_exempt_query_options(self, vector, unique_database):
-    """Non-exempt query options result in different cache entries"""
-    fq_table = "{0}.query_options".format(unique_database)
-    self.create_table(fq_table)
-    query = "SELECT * from {0}".format(fq_table)
-
-    strict_true = dict(vector.get_value('exec_option'))
-    strict_true['strict_mode'] = 'true'
-    strict_false = dict(vector.get_value('exec_option'))
-    strict_false['strict_mode'] = 'false'
-
-    noexempt1 = self.execute_query(query, query_options=strict_false)
-    noexempt2 = self.execute_query(query, query_options=strict_true)
-    noexempt3 = self.execute_query(query, query_options=strict_false)
-    noexempt4 = self.execute_query(query, query_options=strict_true)
-    noexempt5 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
-
-    assert noexempt1.success
-    assert noexempt2.success
-    assert noexempt3.success
-    assert noexempt4.success
-    assert noexempt5.success
-    assert noexempt1.data == noexempt2.data
-    assert noexempt1.data == noexempt3.data
-    assert noexempt1.data == noexempt4.data
-    assert noexempt1.data == noexempt5.data
-    assertCounters(noexempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
-    assertCounters(noexempt2.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
-    assertCounters(noexempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-    assertCounters(noexempt4.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-    assertCounters(noexempt5.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-
-  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, 
cluster_size=1)
-  @pytest.mark.execute_serially
-  def test_exempt_query_options(self, vector, unique_database):
-    """Exempt query options share cache entry"""
-    fq_table = "{0}.query_options".format(unique_database)
-    self.create_table(fq_table)
-    query = "SELECT * from {0}".format(fq_table)
-
-    codegen_false = dict(vector.get_value('exec_option'))
-    codegen_false['disable_codegen'] = 'true'
-    codegen_true = dict(vector.get_value('exec_option'))
-    codegen_true['disable_codegen'] = 'false'
-
-    exempt1 = self.execute_query(query, query_options=codegen_true)
-    exempt2 = self.execute_query(query, query_options=codegen_false)
-    exempt3 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
-    assert exempt1.success
-    assert exempt2.success
-    assert exempt1.data == exempt2.data
-    assert exempt1.data == exempt3.data
-    assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
-    assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-    assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-
-  @CustomClusterTestSuite.with_args(
-      start_args=CACHE_START_ARGS, cluster_size=1,
-      
impalad_args='--tuple_cache_exempt_query_options=max_errors,exec_time_limit_s')
-  @pytest.mark.execute_serially
-  def test_custom_exempt_query_options(self, vector, unique_database):
-    """Custom list of exempt query options share cache entry"""
-    fq_table = "{0}.query_options".format(unique_database)
-    self.create_table(fq_table)
-    query = "SELECT * from {0}".format(fq_table)
-
-    errors_10 = dict(vector.get_value('exec_option'))
-    errors_10['max_errors'] = '10'
-    exec_time_limit = dict(vector.get_value('exec_option'))
-    exec_time_limit['exec_time_limit_s'] = '30'
-
-    exempt1 = self.execute_query(query, query_options=errors_10)
-    exempt2 = self.execute_query(query, query_options=exec_time_limit)
-    exempt3 = self.execute_query(query, 
query_options=vector.get_value('exec_option'))
-    assert exempt1.success
-    assert exempt2.success
-    assert exempt1.data == exempt2.data
-    assert exempt1.data == exempt3.data
-    assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
-    assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-    assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
-
 
[email protected]_args(start_args=CACHE_START_ARGS, cluster_size=1)
 class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
   """Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""
 
@@ -499,9 +507,6 @@ class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
     super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions()
     add_exec_option_dimension(cls, 'mt_dop', [0, 1])
 
-  @CustomClusterTestSuite.with_args(
-    start_args=CACHE_START_ARGS, cluster_size=1)
-  @pytest.mark.execute_serially
   def test_scan_range_basics(self, vector, unique_database):
     """
     This tests that adding/removing files to a table results in different keys.
@@ -574,9 +579,6 @@ class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
     assert rerun_two_files_compile_key == two_files_compile_key
     assert rerun_two_files_result.data == two_files_result.data
 
-  @CustomClusterTestSuite.with_args(
-    start_args=CACHE_START_ARGS, cluster_size=1)
-  @pytest.mark.execute_serially
   def test_scan_range_partitioned(self, vector):
     """
     This tests a basic partitioned case where the query is identical except 
that
@@ -614,15 +616,15 @@ class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
     assert year2010_result.data[0].find("2009") == -1
 
 
-class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
[email protected]_args(start_args=CACHE_START_ARGS)
+class TestTupleCacheFullCluster(TestTupleCacheBase):
+  """Test with 3 executors and a range of mt_dop values."""
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
+    super(TestTupleCacheFullCluster, cls).add_test_dimensions()
     add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2])
 
-  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
-  @pytest.mark.execute_serially
   def test_scan_range_distributed(self, vector, unique_database):
     """
     This tests the distributed case where there are multiple fragment instances
@@ -635,6 +637,10 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     fq_table = "{0}.scan_range_distributed".format(unique_database)
     query = "SELECT * from {0}".format(fq_table)
 
+    entries_baseline = {
+      impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use")
+      for impalad in self.cluster.impalads}
+
     # Create a table with several files so that we always have enough work for 
multiple
     # fragment instances
     self.create_table(fq_table, scale=20)
@@ -652,11 +658,11 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     # Every cache key has the same compile key
     unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys])
     assert len(unique_compile_keys) == 1
-    # Verify the cache metrics for each impalad. Since we started from 
scratch, the
-    # number of entries in the cache should be the same as the number of cache 
keys.
+    # Verify the cache metrics for each impalad. Determine number of new cache 
entries,
+    # which should be the same as the number of cache keys.
     for impalad in self.cluster.impalads:
       entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
-      assert entries_in_use == max(mt_dop, 1)
+      assert entries_in_use - entries_baseline[impalad] == max(mt_dop, 1)
     assert_deterministic_scan(vector, before_result.runtime_profile)
 
     # Insert another row, which creates a file / scan range
@@ -690,10 +696,11 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     all_cache_keys = unique_cache_keys.union(after_insert_unique_cache_keys)
     total_entries_in_use = 0
     for impalad in self.cluster.impalads:
-      entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
-      assert entries_in_use >= max(mt_dop, 1)
-      assert entries_in_use <= (2 * max(mt_dop, 1))
-      total_entries_in_use += entries_in_use
+      new_entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
+      new_entries_in_use -= entries_baseline[impalad]
+      assert new_entries_in_use >= max(mt_dop, 1)
+      assert new_entries_in_use <= (2 * max(mt_dop, 1))
+      total_entries_in_use += new_entries_in_use
     assert total_entries_in_use >= len(all_cache_keys)
     assert_deterministic_scan(vector, after_insert_result.runtime_profile)
 
@@ -714,16 +721,15 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     assert len(different_rows) == 1
 
 
-class TestTupleCacheCountStar(TestTupleCacheBase):
[email protected]_args(start_args=CACHE_START_ARGS, cluster_size=1)
+class TestTupleCacheMtdop(TestTupleCacheBase):
+  """Test with single executor and mt_dop=0 or 2."""
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestTupleCacheCountStar, cls).add_test_dimensions()
+    super(TestTupleCacheMtdop, cls).add_test_dimensions()
     add_exec_option_dimension(cls, 'mt_dop', [0, 2])
 
-  @CustomClusterTestSuite.with_args(
-    start_args=CACHE_START_ARGS, cluster_size=1)
-  @pytest.mark.execute_serially
   def test_tuple_cache_count_star(self, vector, unique_database):
     """
     This test is a regression test for IMPALA-13411 to see whether it hits
@@ -741,17 +747,6 @@ class TestTupleCacheCountStar(TestTupleCacheBase):
     result2 = self.execute_query(query)
     assert result1.success and result2.success
 
-
-class TestTupleCacheComputeStats(TestTupleCacheBase):
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestTupleCacheComputeStats, cls).add_test_dimensions()
-    add_exec_option_dimension(cls, 'mt_dop', [0, 2])
-
-  @CustomClusterTestSuite.with_args(
-    start_args=CACHE_START_ARGS, cluster_size=1)
-  @pytest.mark.execute_serially
   def test_tuple_cache_key_with_stats(self, vector, unique_database):
     """
     This test verifies if compute stats affect the tuple cache key.


Reply via email to