This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9c87cf41bf326f6031605713c7eabf4c12255cf4
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Sep 18 20:49:38 2024 -0700

    IMPALA-13396: Unify tmp dir management in CustomClusterTestSuite
    
    There are many custom cluster tests that require creating temporary
    directory. The temporary directory typically live within a scope of test
    method and cleaned afterwards. However, some test do create temporary
    directory directly and forgot to clean them afterwards, leaving junk
    dirs under /tmp/ or $LOG_DIR.
    
    This patch unify the temporary directory management inside
    CustomClusterTestSuite. It introduce new 'tmp_dir_placeholders' arg in
    CustomClusterTestSuite.with_args() that list tmp dirs to create.
    'impalad_args', 'catalogd_args', and 'impala_log_dir' now accept
    formatting pattern that is replaceable by a temporary dir path, defined
    through 'tmp_dir_placeholders'.
    
    There are few occurrences where mkdtemp is called and not replaceable by
    this work, such as tests/comparison/cluster.py. In that case, this patch
    change them to supply prefix arg so that developer knows that it comes
    from Impala test script.
    
    This patch also addressed several flake8 errors in modified files.
    
    Testing:
    - Pass custom cluster tests in exhaustive mode.
    - Manually run few modified tests and observe that the temporary dirs
      are created and removed under logs/custom_cluster_tests/ as the tests
      go.
    
    Change-Id: I8dd665e8028b3f03e5e33d572c5e188f85c3bdf5
    Reviewed-on: http://gerrit.cloudera.org:8080/21836
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/authorization/test_authorization.py          | 20 ++---
 tests/authorization/test_authorized_proxy.py       | 57 ++++++-------
 tests/authorization/test_provider.py               | 23 +++--
 tests/common/custom_cluster_test_suite.py          | 95 ++++++++++++++++-----
 tests/common/file_utils.py                         | 18 ++++
 tests/comparison/cluster.py                        | 20 +++--
 tests/custom_cluster/test_admission_controller.py  | 29 ++++---
 tests/custom_cluster/test_blacklist.py             | 28 +++----
 tests/custom_cluster/test_breakpad.py              | 10 +--
 tests/custom_cluster/test_kudu.py                  | 33 ++++----
 tests/custom_cluster/test_lineage.py               | 76 +++++++++--------
 tests/custom_cluster/test_permanent_udfs.py        | 97 +++++++++++-----------
 tests/custom_cluster/test_query_event_hooks.py     | 39 ++++-----
 tests/custom_cluster/test_query_log.py             | 27 +++---
 tests/custom_cluster/test_query_retries.py         | 42 +++++-----
 tests/custom_cluster/test_redaction.py             | 31 ++-----
 tests/custom_cluster/test_scratch_disk.py          | 70 ++++++++--------
 tests/custom_cluster/test_shell_commandline.py     | 21 +++--
 tests/custom_cluster/test_shell_jwt_auth.py        | 30 +++----
 .../test_startup_filesystem_checks.py              | 61 ++++++++------
 20 files changed, 451 insertions(+), 376 deletions(-)

diff --git a/tests/authorization/test_authorization.py 
b/tests/authorization/test_authorization.py
index a0147e6f9..969562968 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -18,17 +18,10 @@
 # Client tests for SQL statement authorization
 
 from __future__ import absolute_import, division, print_function
-import os
 import pytest
-import tempfile
-import grp
-import re
 import random
-import sys
-import subprocess
 import threading
 import time
-import urllib
 
 from getpass import getuser
 from ImpalaService import ImpalaHiveServer2Service
@@ -37,15 +30,13 @@ from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 from thrift.protocol import TBinaryProtocol
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.file_utils import assert_file_in_dir_contains,\
-    assert_no_files_in_dir_contain
+from tests.common.file_utils import assert_file_in_dir_contains
 from tests.common.test_result_verifier import error_msg_equal
-from tests.common.skip import SkipIf
-
 
 PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
 ADMIN = "admin"
 
+
 class TestAuthorization(CustomClusterTestSuite):
   def setup(self):
     host, port = (self.cluster.impalads[0].service.hostname,
@@ -96,10 +87,9 @@ class TestAuthorization(CustomClusterTestSuite):
     return resp
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--server_name=server1\
-      --authorization_policy_file=ignored_file",
-      impala_log_dir=tempfile.mkdtemp(prefix="test_deprecated_",
-      dir=os.getenv("LOG_DIR")))
+  @CustomClusterTestSuite.with_args(
+      "--server_name=server1 --authorization_policy_file=ignored_file",
+      impala_log_dir="{deprecated_flags}", 
tmp_dir_placeholders=['deprecated_flags'])
   def test_deprecated_flags(self):
     assert_file_in_dir_contains(self.impala_log_dir, "Ignoring removed flag "
                                                      
"authorization_policy_file")
diff --git a/tests/authorization/test_authorized_proxy.py 
b/tests/authorization/test_authorized_proxy.py
index 68053a840..20005139b 100644
--- a/tests/authorization/test_authorized_proxy.py
+++ b/tests/authorization/test_authorized_proxy.py
@@ -18,39 +18,35 @@
 from __future__ import absolute_import, division, print_function
 import pytest
 import os
-import grp
 import time
 import json
-import tempfile
-import shutil
 
-from getpass import getuser
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
 from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 from thrift.protocol import TBinaryProtocol
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIf
 from tests.hs2.hs2_test_suite import operation_id_to_query_id
 
-AUDIT_LOG_DIR = tempfile.mkdtemp(dir=os.getenv("LOG_DIR"))
-
-RANGER_IMPALAD_ARGS = "--server-name=server1 " \
-                      "--ranger_service_type=hive " \
-                      "--ranger_app_id=impala " \
-                      "--authorization_provider=ranger " \
-                      "--abort_on_failed_audit_event=false " \
-                      "--audit_event_log_dir={0}".format(AUDIT_LOG_DIR)
-RANGER_CATALOGD_ARGS = "--server-name=server1 " \
-                       "--ranger_service_type=hive " \
-                       "--ranger_app_id=impala " \
-                       "--authorization_provider=ranger"
+AUDIT_LOG_DIR = 'audit_log_dir'
+
+RANGER_IMPALAD_ARGS = ("--server-name=server1 "
+                       "--ranger_service_type=hive "
+                       "--ranger_app_id=impala "
+                       "--authorization_provider=ranger "
+                       "--abort_on_failed_audit_event=false "
+                       "--audit_event_log_dir={" + AUDIT_LOG_DIR + "}")
+RANGER_CATALOGD_ARGS = ("--server-name=server1 "
+                        "--ranger_service_type=hive "
+                        "--ranger_app_id=impala "
+                        "--authorization_provider=ranger")
 RANGER_ADMIN_USER = "admin"
 
 
 class TestAuthorizedProxy(CustomClusterTestSuite):
-  def setup(self):
+  def setup_method(self, method):
+    super(TestAuthorizedProxy, self).setup_method(method)
     host, port = (self.cluster.impalads[0].service.hostname,
                   self.cluster.impalads[0].service.hs2_port)
     self.socket = TSocket(host, port)
@@ -59,10 +55,10 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
     self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
     self.hs2_client = ImpalaHiveServer2Service.Client(self.protocol)
 
-  def teardown(self):
+  def teardown_method(self, method):
+    super(TestAuthorizedProxy, self).teardown_method(method)
     if self.socket:
       self.socket.close()
-    shutil.rmtree(AUDIT_LOG_DIR, ignore_errors=True)
 
   def _execute_hs2_stmt(self, statement, verify=True):
     """
@@ -103,7 +99,8 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="{0} --authorized_proxy_user_config=foo=bar;hue=non_owner "
                  .format(RANGER_IMPALAD_ARGS),
-    catalogd_args=RANGER_CATALOGD_ARGS)
+    catalogd_args=RANGER_CATALOGD_ARGS,
+    tmp_dir_placeholders=[AUDIT_LOG_DIR])
   def test_authorized_proxy_user_with_ranger(self):
     """Tests authorized proxy user with Ranger using HS2."""
     self._test_authorized_proxy_with_ranger(self._test_authorized_proxy, 
"non_owner",
@@ -115,7 +112,8 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
                  "--authorized_proxy_group_config=foo=bar;hue=non_owner "
                  "--use_customized_user_groups_mapper_for_ranger"
                  .format(RANGER_IMPALAD_ARGS),
-    catalogd_args=RANGER_CATALOGD_ARGS)
+    catalogd_args=RANGER_CATALOGD_ARGS,
+    tmp_dir_placeholders=[AUDIT_LOG_DIR])
   def test_authorized_proxy_group_with_ranger(self):
     """Tests authorized proxy group with Ranger using HS2."""
     self._test_authorized_proxy_with_ranger(self._test_authorized_proxy, 
"non_owner",
@@ -125,7 +123,8 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="{0} --authorized_proxy_user_config=foo=bar "
                  
"--authorized_proxy_group_config=foo=bar".format(RANGER_IMPALAD_ARGS),
-    catalogd_args=RANGER_CATALOGD_ARGS)
+    catalogd_args=RANGER_CATALOGD_ARGS,
+    tmp_dir_placeholders=[AUDIT_LOG_DIR])
   def test_no_matching_user_and_group_authorized_proxy_with_ranger(self):
     self._test_no_matching_user_and_group_authorized_proxy()
 
@@ -137,8 +136,8 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
     resp = self.hs2_client.OpenSession(open_session_req)
     assert "User 'hue' is not authorized to delegate to 'abc'" in str(resp)
 
-  def _test_authorized_proxy_with_ranger(self, test_func, delegated_user,
-                                         delegated_to_group):
+  def _test_authorized_proxy_with_ranger(
+      self, test_func, delegated_user, delegated_to_group):
     try:
       self.session_handle = self._open_hs2(RANGER_ADMIN_USER, 
dict()).sessionHandle
       if not delegated_to_group:
@@ -217,14 +216,16 @@ class TestAuthorizedProxy(CustomClusterTestSuite):
     # found.
     start_time = time.time()
     while time.time() - start_time < timeout_secs:
-      for audit_file_name in os.listdir(AUDIT_LOG_DIR):
-        if self._find_matching_audit_record(audit_file_name, user, 
impersonator):
+      for audit_file_name in os.listdir(self.get_tmp_dir(AUDIT_LOG_DIR)):
+        if self._find_matching_audit_record(
+          audit_file_name, user, impersonator):
           return True
       time.sleep(1)
     return False
 
   def _find_matching_audit_record(self, audit_file_name, user, impersonator):
-    with open(os.path.join(AUDIT_LOG_DIR, audit_file_name)) as audit_log_file:
+    with open(
+      os.path.join(self.get_tmp_dir(AUDIT_LOG_DIR), audit_file_name)) as 
audit_log_file:
       for line in audit_log_file.readlines():
         json_dict = json.loads(line)
         if len(json_dict) == 0: continue
diff --git a/tests/authorization/test_provider.py 
b/tests/authorization/test_provider.py
index 4d07e7082..5800259cc 100644
--- a/tests/authorization/test_provider.py
+++ b/tests/authorization/test_provider.py
@@ -19,8 +19,6 @@
 
 from __future__ import absolute_import, division, print_function
 import pytest
-import os
-import tempfile
 
 from tests.common.file_utils import assert_file_in_dir_contains
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -36,28 +34,29 @@ class TestAuthorizationProvider(CustomClusterTestSuite):
   """
 
   BAD_FLAG = "foobar"
-  LOG_DIR = tempfile.mkdtemp(prefix="test_provider_", dir=os.getenv("LOG_DIR"))
-  MINIDUMP_PATH = tempfile.mkdtemp()
+  LOG_DIR = "invalid_provider_flag"
+  MINIDUMP_PATH = "invalid_provider_flag_minidump"
 
   pre_test_cores = None
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       expect_cores=True,
-      impala_log_dir=LOG_DIR,
-      impalad_args="--minidump_path={0} "
+      impala_log_dir="{" + LOG_DIR + "}",
+      impalad_args="--minidump_path={" + MINIDUMP_PATH + "} "
                    "--server-name=server1 "
                    "--ranger_service_type=hive "
                    "--ranger_app_id=impala "
-                   "--authorization_provider={1}".format(MINIDUMP_PATH, 
BAD_FLAG),
-      catalogd_args="--minidump_path={0} "
+                   "--authorization_provider=" + BAD_FLAG,
+      catalogd_args="--minidump_path={" + MINIDUMP_PATH + "} "
                     "--server-name=server1 "
                     "--ranger_service_type=hive "
                     "--ranger_app_id=impala "
-                    "--authorization_provider={1}".format(MINIDUMP_PATH, 
BAD_FLAG))
-  def test_invalid_provider_flag(self, unique_name):
+                    "--authorization_provider=" + BAD_FLAG,
+      tmp_dir_placeholders=[LOG_DIR, MINIDUMP_PATH])
+  def test_invalid_provider_flag(self):
     # parse log file for expected exception
-    assert_file_in_dir_contains(TestAuthorizationProvider.LOG_DIR,
+    assert_file_in_dir_contains(self.get_tmp_dir(self.LOG_DIR),
                                 "InternalException: Could not parse "
                                 "authorization_provider flag: {0}"
-                                .format(TestAuthorizationProvider.BAD_FLAG))
+                                .format(self.BAD_FLAG))
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 50005a417..40bccbd23 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -25,14 +25,18 @@ import os.path
 import pipes
 import pytest
 import subprocess
+
 from impala_py_lib.helpers import find_all_files, is_core_dump
 from signal import SIGRTMIN
 from subprocess import check_call
+from tests.common.file_utils import cleanup_tmp_test_dir, make_tmp_test_dir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.util.filesystem_utils import IS_LOCAL
 from time import sleep
 
+LOG = logging.getLogger(__name__)
+
 IMPALA_HOME = os.environ['IMPALA_HOME']
 DEFAULT_CLUSTER_SIZE = 3
 NUM_COORDINATORS = DEFAULT_CLUSTER_SIZE
@@ -63,16 +67,27 @@ RESET_RANGER = 'reset_ranger'
 # this to `True` causes Impalad processes to be sent the SIGRTMIN signal at 
the end of the
 # test which runs the Impalad shutdown steps instead of abruptly ending the 
process.
 IMPALAD_GRACEFUL_SHUTDOWN = 'impalad_graceful_shutdown'
+# Decorator key to support temporary dir creation.
+TMP_DIR_PLACEHOLDERS = 'tmp_dir_placeholders'
+
+# Args that accept additional formatting to supply temporary dir path.
+ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR])
 
 # Run with fast topic updates by default to reduce time to first query running.
-DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
-    --statestore_priority_update_frequency_ms=50 \
-    --statestore_heartbeat_frequency_ms=50'
+DEFAULT_STATESTORE_ARGS = ('--statestore_update_frequency_ms=50 '
+                           '--statestore_priority_update_frequency_ms=50 '
+                           '--statestore_heartbeat_frequency_ms=50')
 
 
 class CustomClusterTestSuite(ImpalaTestSuite):
   """Every test in a test suite deriving from this class gets its own Impala 
cluster.
   Custom arguments may be passed to the cluster by using the @with_args 
decorator."""
+
+  # Central place to keep all temporary dirs referred by a custom cluster test 
method.
+  # setup_method() will populate this using make_tmp_dir(), and then 
teardown_method()
+  # will clean up using clear_tmp_dirs().
+  TMP_DIRS = dict()
+
   @classmethod
   def get_workload(cls):
     return 'tpch'
@@ -88,12 +103,12 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     # By default, custom cluster tests only run on text/none and with a 
limited set of
     # exec options. Subclasses may override this to relax these default 
constraints.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'text' and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec == 'none')
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('exec_option')['batch_size'] == 0 and
-        v.get_value('exec_option')['disable_codegen'] == False and
-        v.get_value('exec_option')['num_nodes'] == 0)
+        v.get_value('exec_option')['batch_size'] == 0
+        and v.get_value('exec_option')['disable_codegen'] is False
+        and v.get_value('exec_option')['num_nodes'] == 0)
 
   @classmethod
   def setup_class(cls):
@@ -117,17 +132,18 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
       impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
-      impalad_graceful_shutdown=False):
+      impalad_graceful_shutdown=False, tmp_dir_placeholders=[]):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
     method's func_dict"""
     def decorate(func):
       if impalad_args is not None:
         func.__dict__[IMPALAD_ARGS] = impalad_args
-      func.__dict__[STATESTORED_ARGS] = statestored_args
+      if statestored_args is not None:
+        func.__dict__[STATESTORED_ARGS] = statestored_args
       if catalogd_args is not None:
         func.__dict__[CATALOGD_ARGS] = catalogd_args
       if start_args is not None:
-        func.__dict__[START_ARGS] = start_args.split()
+        func.__dict__[START_ARGS] = start_args
       if jvm_args is not None:
         func.__dict__[JVM_ARGS] = jvm_args
       if hive_conf_dir is not None:
@@ -152,11 +168,44 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.__dict__[RESET_RANGER] = True
       if impalad_graceful_shutdown is not False:
         func.__dict__[IMPALAD_GRACEFUL_SHUTDOWN] = True
+      if tmp_dir_placeholders:
+        func.__dict__[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders
       return func
     return decorate
 
+  def make_tmp_dir(self, name):
+    """Create a temporary directory and register it."""
+    assert name not in self.TMP_DIRS
+    self.TMP_DIRS[name] = make_tmp_test_dir(name)
+    LOG.info("Created temporary dir {}".format(self.TMP_DIRS[name]))
+    return self.TMP_DIRS[name]
+
+  def get_tmp_dir(self, name):
+    """Get the path of temporary directory that was registered with given 
'name'."""
+    return self.TMP_DIRS[name]
+
+  def clear_tmp_dirs(self):
+    """Clear all temporary dirs."""
+    for tmp_dir in self.TMP_DIRS.values():
+      LOG.info("Removing temporary dir {}".format(tmp_dir))
+      cleanup_tmp_test_dir(tmp_dir)
+    self.TMP_DIRS.clear()
+
+  def clear_tmp_dir(self, name):
+    """Clear temporary dir 'name'."""
+    assert name in self.TMP_DIRS
+    LOG.info("Removing temporary dir {}".format(self.TMP_DIRS[name]))
+    cleanup_tmp_test_dir(self.TMP_DIRS[name])
+    del self.TMP_DIRS[name]
+
   def setup_method(self, method):
     cluster_args = list()
+
+    if TMP_DIR_PLACEHOLDERS in method.__dict__:
+      # Create all requested temporary dirs.
+      for name in method.__dict__[TMP_DIR_PLACEHOLDERS]:
+        self.make_tmp_dir(name)
+
     if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
       # IMPALA-13051: Add faster default graceful shutdown options before 
processing
       # explicit args. Impala doesn't start graceful shutdown until the grace 
period has
@@ -166,9 +215,11 @@ class CustomClusterTestSuite(ImpalaTestSuite):
           "--impalad=--shutdown_grace_period_s=0 --shutdown_deadline_s=15")
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS, JVM_ARGS]:
       if arg in method.__dict__:
-        cluster_args.append("--%s=%s " % (arg, method.__dict__[arg]))
+        val = (method.__dict__[arg] if arg not in ACCEPT_FORMATTING
+               else method.__dict__[arg].format(**self.TMP_DIRS))
+        cluster_args.append("--%s=%s " % (arg, val))
     if START_ARGS in method.__dict__:
-      cluster_args.extend(method.__dict__[START_ARGS])
+      cluster_args.extend(method.__dict__[START_ARGS].split())
 
     if HIVE_CONF_DIR in method.__dict__:
       self._start_hive_service(method.__dict__[HIVE_CONF_DIR])
@@ -199,15 +250,15 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       "cluster_size": cluster_size,
       "num_coordinators": num_coordinators,
       "expected_num_impalads": cluster_size,
-      "default_query_options": method.__dict__.get(DEFAULT_QUERY_OPTIONS),
+      DEFAULT_QUERY_OPTIONS: method.__dict__.get(DEFAULT_QUERY_OPTIONS),
       "use_exclusive_coordinators": use_exclusive_coordinators
     }
     if IMPALA_LOG_DIR in method.__dict__:
-      kwargs["impala_log_dir"] = method.__dict__[IMPALA_LOG_DIR]
+      kwargs[IMPALA_LOG_DIR] = 
method.__dict__[IMPALA_LOG_DIR].format(**self.TMP_DIRS)
     if STATESTORED_TIMEOUT_S in method.__dict__:
-      kwargs["statestored_timeout_s"] = method.__dict__[STATESTORED_TIMEOUT_S]
+      kwargs[STATESTORED_TIMEOUT_S] = method.__dict__[STATESTORED_TIMEOUT_S]
     if IMPALAD_TIMEOUT_S in method.__dict__:
-      kwargs["impalad_timeout_s"] = method.__dict__[IMPALAD_TIMEOUT_S]
+      kwargs[IMPALAD_TIMEOUT_S] = method.__dict__[IMPALAD_TIMEOUT_S]
 
     if method.__dict__.get(EXPECT_CORES, False):
       # Make a note of any core files that already exist
@@ -231,6 +282,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       for impalad in self.cluster.impalads:
         impalad.wait_for_exit()
 
+    self.clear_tmp_dirs()
+
     if HIVE_CONF_DIR in method.__dict__:
       self._start_hive_service(None)  # Restart Hive Service using default 
configs
 
@@ -240,8 +293,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       post_test_cores = set([f for f in possible_cores if is_core_dump(f)])
 
       for f in (post_test_cores - self.pre_test_cores):
-        logging.info(
-            "Cleaned up {core} created by {name}".format(core=f, 
name=method.__name__))
+        LOG.info("Cleaned up {core} created by {name}".format(
+          core=f, name=method.__name__))
         os.remove(f)
       # Skip teardown_class as setup was skipped.
     else:
@@ -356,8 +409,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     options.append("--impalad_args=--default_query_options={0}".format(
         ','.join(["{0}={1}".format(k, v) for k, v in 
default_query_option_kvs])))
 
-    logging.info("Starting cluster with command: %s" %
-                 " ".join(pipes.quote(arg) for arg in cmd + options))
+    LOG.info("Starting cluster with command: %s" %
+        " ".join(pipes.quote(arg) for arg in cmd + options))
     try:
       check_call(cmd + options, close_fds=True)
     finally:
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index 9850c3270..f3c4e478f 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -22,6 +22,7 @@
 from __future__ import absolute_import, division, print_function
 import os
 import re
+import shutil
 import tempfile
 from subprocess import check_call
 
@@ -71,6 +72,7 @@ def create_iceberg_table_from_directory(impala_client, 
unique_database, table_na
   impala_client.execute("""alter table {0} set tblproperties 
('external.table.purge'=
                         'True');""".format(qualified_table_name))
 
+
 def create_table_from_parquet(impala_client, unique_database, table_name):
   """Utility function to create a database table from a Parquet file. A 
Parquet file must
   exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet"""
@@ -176,3 +178,19 @@ def assert_no_files_in_dir_contain(dir, search):
   assert not results, \
       "%s should not have any file containing '%s' but a file was found" \
       % (dir, search)
+
+
+def make_tmp_test_dir(name):
+  """Create temporary directory with prefix 'impala_test_<name>_'.
+  Return the path of temporary directory as string. Caller is responsible to
+  clean them. If LOG_DIR env var exist, the temporary dir will be placed inside
+  LOG_DIR."""
+  # TODO: Consider using tempfile.TemporaryDirectory from python3 in the 
future.
+  parent_dir = os.getenv("LOG_DIR", None)
+  return tempfile.mkdtemp(prefix='impala_test_{}_'.format(name), 
dir=parent_dir)
+
+
+def cleanup_tmp_test_dir(dir_path):
+  """Remove temporary 'dir_path' and its content.
+  Ignore errors upon deletion."""
+  shutil.rmtree(dir_path, ignore_errors=True)
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index dc1daf8d6..8a7c623cb 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -201,7 +201,7 @@ class MiniCluster(Cluster):
     return local_shell(cmd, timeout_secs=timeout_secs)
 
   def _init_local_hadoop_conf_dir(self):
-    self._local_hadoop_conf_dir = mkdtemp()
+    self._local_hadoop_conf_dir = mkdtemp(prefix='impala_mini_cluster_')
 
     node_conf_dir = self._get_node_conf_dir()
     for file_name in os.listdir(node_conf_dir):
@@ -232,6 +232,7 @@ class MiniCluster(Cluster):
                 for p in range(self.num_impalads)]
     self._impala = Impala(self, impalads)
 
+
 class MiniHiveCluster(MiniCluster):
   """
   A MiniCluster useful for running against Hive. It allows Hadoop 
configuration files
@@ -324,7 +325,7 @@ class CmCluster(Cluster):
           self._ssh_clients_by_host_name[host_name].append(client)
 
   def _init_local_hadoop_conf_dir(self):
-    self._local_hadoop_conf_dir = mkdtemp()
+    self._local_hadoop_conf_dir = mkdtemp(prefix='impala_mini_hive_cluster_')
     data = BytesIO(self.cm.get("/clusters/%s/services/%s/clientConfig"
       % (self.cm_cluster.name, self._find_service("HIVE").name)))
     zip_file = ZipFile(data)
@@ -430,7 +431,7 @@ class HdfsClient(object):
     s.verify = False
     if use_kerberos:
       try:
-        from hdfs.ext.kerberos import KerberosClient
+        self.init_kerberos_client(url, s)
       except ImportError as e:
         if "No module named requests_kerberos" not in str(e):
           raise e
@@ -444,15 +445,18 @@ class HdfsClient(object):
                       stdout=subprocess.PIPE,
                       stderr=subprocess.STDOUT)
           LOG.info("kerberos installation complete.")
+          self.init_kerberos_client(url, s)
         except Exception as e:
           LOG.error("kerberos installation failed. Try installing libkrb5-dev 
and"
               " then try again.")
           raise e
-      from hdfs.ext.kerberos import KerberosClient
-      self._client = KerberosClient(url, session=s)
     else:
       self._client = hdfs.client.InsecureClient(url, user=user_name, session=s)
 
+  def init_kerberos_client(self, url, session):
+    from hdfs.ext.kerberos import KerberosClient
+    self._client = KerberosClient(url, session=session)
+
   def __getattr__(self, name):
     return getattr(self._client, name)
 
@@ -487,8 +491,8 @@ class Yarn(Service):
     """
     env = dict(os.environ)
     env['HADOOP_CONF_DIR'] = self.cluster.local_hadoop_conf_dir
-    env['CDH_MR2_HOME'] =  os.environ['HADOOP_HOME']
-    env['HADOOP_USER_NAME'] =  self.cluster.hadoop_user_name
+    env['CDH_MR2_HOME'] = os.environ['HADOOP_HOME']
+    env['HADOOP_USER_NAME'] = self.cluster.hadoop_user_name
     local_shell('hadoop jar %s %s' % (jar_path, job_args), 
stdout=subprocess.PIPE,
         stderr=subprocess.STDOUT, env=env)
 
@@ -835,7 +839,7 @@ class Impalad(with_metaclass(ABCMeta, object)):
     for metric in self.get_metrics():
       if metric["name"] == name:
         return metric
-    raise Exception("Metric '%s' not found" % name);
+    raise Exception("Metric '%s' not found" % name)
 
   def __repr__(self):
     return "<%s host: %s>" % (type(self).__name__, self.label)
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index cd0d1fcd2..dd891c998 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -33,7 +33,11 @@ from time import sleep, time
 from beeswaxd.BeeswaxService import QueryState
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.custom_cluster_test_suite import (
+    ADMISSIOND_ARGS,
+    IMPALAD_ARGS,
+    START_ARGS,
+    CustomClusterTestSuite)
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.resource_pool_config import ResourcePoolConfig
@@ -1632,11 +1636,12 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
 
-    if 'start_args' not in method.__dict__:
-      method.__dict__['start_args'] = list()
-    method.__dict__["start_args"].append("--enable_admission_service")
-    if "impalad_args" in method.__dict__:
-      method.__dict__["admissiond_args"] = method.__dict__["impalad_args"]
+    start_args = "--enable_admission_service"
+    if START_ARGS in method.__dict__:
+      start_args = method.__dict__[START_ARGS] + " " + start_args
+    method.__dict__[START_ARGS] = start_args
+    if IMPALAD_ARGS in method.__dict__:
+      method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
     super(TestAdmissionController, self).setup_method(method)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2396,9 +2401,11 @@ class 
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
   def setup_method(self, method):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
-    if 'start_args' not in method.__dict__:
-      method.__dict__['start_args'] = list()
-    method.__dict__["start_args"].append("--enable_admission_service")
-    if "impalad_args" in method.__dict__:
-      method.__dict__["admissiond_args"] = method.__dict__["impalad_args"]
+
+    start_args = "--enable_admission_service"
+    if START_ARGS in method.__dict__:
+      start_args = method.__dict__[START_ARGS] + " " + start_args
+    method.__dict__[START_ARGS] = start_args
+    if IMPALAD_ARGS in method.__dict__:
+      method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
     super(TestAdmissionControllerStress, self).setup_method(method)
diff --git a/tests/custom_cluster/test_blacklist.py 
b/tests/custom_cluster/test_blacklist.py
index be8d6999f..e237d6e8c 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -21,12 +21,10 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 
 import pytest
 import re
-import shutil
-import tempfile
 
 from beeswaxd.BeeswaxService import QueryState
-from tests.common.skip import SkipIfNotHdfsMinicluster
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfBuildType
+from tests.common.test_dimensions import add_mandatory_exec_option
 from time import sleep
 
 # The BE krpc port of the impalad to simulate disk errors in tests.
@@ -36,6 +34,7 @@ FAILED_KRPC_PORT = 27001
 def _get_disk_write_fail_action(port):
   return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
 
+
 # Tests that verify the behavior of the executor blacklist caused by RPC 
failure.
 # Coordinator adds an executor node to its blacklist if the RPC to that node 
failed.
 # Note: query-retry is not enabled by default.
@@ -201,6 +200,13 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestBlacklistFaultyDisk, cls).setup_class()
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestBlacklistFaultyDisk, cls).add_test_dimensions()
+    # Buffer pool limit that is low enough to force Impala to spill to disk 
when
+    # executing spill_query.
+    add_mandatory_exec_option(cls, 'buffer_pool_limit', '45m')
+
   # Query with order by requires spill to disk if intermediate results don't 
fit in mem
   spill_query = """
       select o_orderdate, o_custkey, o_comment
@@ -218,28 +224,23 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite):
   in_mem_query = """
       select o_orderdate, o_custkey, o_comment from tpch.orders
       """
-  # Buffer pool limit that is low enough to force Impala to spill to disk when 
executing
-  # spill_query.
-  buffer_pool_limit = "45m"
 
   def __generate_scratch_dir(self, num):
     result = []
     for i in range(num):
-      dir_path = tempfile.mkdtemp()
-      self.created_dirs.append(dir_path)
+      dir_path = self.make_tmp_dir('scratch_dir_{}'.format(i))
       result.append(dir_path)
-      print("Generated dir" + dir_path)
+      print("Generated dir " + dir_path)
     return result
 
   def setup_method(self, method):
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this class, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
-    self.created_dirs = []
+    pass
 
   def teardown_method(self, method):
-    for dir_path in self.created_dirs:
-      shutil.rmtree(dir_path, ignore_errors=True)
+    self.clear_tmp_dirs()
 
   @SkipIfBuildType.not_dev_build
   @pytest.mark.execute_serially
@@ -261,7 +262,6 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite):
         expected_count=2)
 
     # First set debug_action for query as empty.
-    vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     vector.get_value('exec_option')['debug_action'] = ''
     coord_impalad = self.cluster.get_first_impalad()
     client = coord_impalad.service.create_beeswax_client()
diff --git a/tests/custom_cluster/test_breakpad.py 
b/tests/custom_cluster/test_breakpad.py
index 81173e6a8..c8348828e 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -22,18 +22,17 @@ import os
 import psutil
 import pytest
 import shutil
-import tempfile
 import time
 from resource import setrlimit, RLIMIT_CORE, RLIM_INFINITY
 from signal import SIGSEGV, SIGKILL, SIGUSR1, SIGTERM
 from subprocess import CalledProcessError
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfBuildType
 
 DAEMONS = ['impalad', 'statestored', 'catalogd']
 DAEMON_ARGS = ['impalad_args', 'state_store_args', 'catalogd_args']
 
+
 class TestBreakpadBase(CustomClusterTestSuite):
   """Base class with utility methods for all breakpad tests."""
   @classmethod
@@ -43,14 +42,14 @@ class TestBreakpadBase(CustomClusterTestSuite):
   def setup_method(self, method):
     # Override parent
     # The temporary directory gets removed in teardown_method() after each 
test.
-    self.tmp_dir = tempfile.mkdtemp()
+    self.tmp_dir = self.make_tmp_dir('breakpad')
 
   def teardown_method(self, method):
     # Override parent
     # Stop the cluster to prevent future accesses to self.tmp_dir.
     self.kill_cluster(SIGKILL)
-    assert self.tmp_dir
-    shutil.rmtree(self.tmp_dir)
+    assert len(self.TMP_DIRS) > 0
+    self.clear_tmp_dirs()
 
   @classmethod
   def setup_class(cls):
@@ -259,7 +258,6 @@ class TestBreakpadExhaustive(TestBreakpadBase):
     assert self.get_num_processes('catalogd') == 0
     assert self.get_num_processes('statestored') == 0
     assert self.count_all_minidumps() == 0
-    uid = os.getuid()
     # There should be a SIGTERM message in the log now
     # since we raised one above.
     log_str = 'Caught signal: SIGTERM. Daemon will exit.'
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 9c785de58..c8154c8af 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -19,9 +19,8 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import os
 import pytest
-import tempfile
+from copy import deepcopy
 from kudu.schema import INT32
-from time import sleep
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -48,9 +47,9 @@ class CustomKuduTest(CustomClusterTestSuite, KuduTestSuite):
     # 'file_format' and 'compression_codec' being "kudu" and "none" 
respectively will not
     # be skipped.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('exec_option')['batch_size'] == 0 and
-        v.get_value('exec_option')['disable_codegen'] is False and
-        v.get_value('exec_option')['num_nodes'] == 0)
+        v.get_value('exec_option')['batch_size'] == 0
+        and v.get_value('exec_option')['disable_codegen'] is False
+        and v.get_value('exec_option')['num_nodes'] == 0)
 
 
 class TestKuduOperations(CustomKuduTest):
@@ -67,8 +66,8 @@ class TestKuduOperations(CustomKuduTest):
     add_mandatory_exec_option(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args=\
-      "--use_local_tz_for_unix_timestamp_conversions=true")
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_tz_for_unix_timestamp_conversions=true")
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfKudu.hms_integration_enabled()
   def test_local_tz_conversion_ops(self, vector, unique_database):
@@ -142,8 +141,6 @@ class TestKuduClientTimeout(CustomKuduTest):
 
 @SkipIf.is_test_jdk
 class TestKuduHMSIntegration(CustomKuduTest):
-  START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
-
   # TODO(IMPALA-8614): parameterize the common tests in query_test/test_kudu.py
   # to run with HMS integration enabled. Also avoid restarting Impala to reduce
   # tests time.
@@ -175,18 +172,21 @@ class TestKuduHMSIntegration(CustomKuduTest):
        with the Hive Metastore for managed tables. Increase timeout of 
individual Kudu
        client rpcs to avoid requests fail due to operation delay in the Hive 
Metastore
        for managed tables (IMPALA-8856)."""
-    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
-    self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
+    self.run_test_case('QueryTest/kudu_create', new_vector, 
use_db=unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000
 "
-                                    "--lineage_event_log_dir={0}"
-                                    .format(START_END_TIME_LINEAGE_LOG_DIR))
+  @CustomClusterTestSuite.with_args(
+      impalad_args=("-kudu_client_rpc_timeout_ms=30000 "
+        "--lineage_event_log_dir={start_end_time}"),
+      tmp_dir_placeholders=['start_end_time'])
   def test_create_kudu_tables_with_lineage_enabled(self, vector, 
unique_database):
     """Same as above test_create_managed_kudu_tables, but with lineage 
enabled."""
-    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
-    self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
+    self.run_test_case('QueryTest/kudu_create', new_vector, 
use_db=unique_database)
 
   @pytest.mark.execute_serially
   def test_implicit_external_table_props(self, cursor, kudu_client):
@@ -359,6 +359,7 @@ class TestKuduHMSIntegration(CustomKuduTest):
       vector,
       use_db=unique_database)
 
+
 class TestKuduTransactionBase(CustomClusterTestSuite):
   """
   This is a base class of other TestKuduTransaction classes.
diff --git a/tests/custom_cluster/test_lineage.py 
b/tests/custom_cluster/test_lineage.py
index 42e4cbee8..34486f998 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -23,7 +23,6 @@ import logging
 import os
 import pytest
 import re
-import tempfile
 import time
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -33,10 +32,11 @@ LOG = logging.getLogger(__name__)
 
 
 class TestLineage(CustomClusterTestSuite):
-  START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
-  CREATE_TABLE_TIME_LINEAGE_LOG_DIR = 
tempfile.mkdtemp(prefix="create_table_time")
-  DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
-  LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
+  START_END_TIME = "start_end_time"
+  CREATE_TABLE_TIME = "create_table_time"
+  CREATE_TABLE_TIME_NO_HMS = "create_table_time_no_hms"
+  DDL_LINEAGE = "ddl_lineage"
+  LINEAGE = "lineage"
 
   @classmethod
   def setup_class(cls):
@@ -46,25 +46,30 @@ class TestLineage(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def teardown_class(cls):
+    super(TestLineage, cls).teardown_class()
+
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
-                                    .format(START_END_TIME_LINEAGE_LOG_DIR))
-  def test_start_end_timestamp(self, vector):
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--lineage_event_log_dir={" + START_END_TIME + "}",
+      tmp_dir_placeholders=[START_END_TIME])
+  def test_start_end_timestamp(self):
     """Test that 'timestamp' and 'endTime' in the lineage graph are populated 
with valid
        UNIX times."""
-    LOG.info("lineage_event_log_dir is 
{0}".format(self.START_END_TIME_LINEAGE_LOG_DIR))
+    LOG.info("lineage_event_log_dir is 
{0}".format(self.get_tmp_dir(self.START_END_TIME)))
     before_time = int(time.time())
     query = "select count(*) from functional.alltypes"
     result = self.execute_query_expect_success(self.client, query)
-    profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
+    profile_query_id = re.search(r"Query \(id=(.*)\):", 
result.runtime_profile).group(1)
     after_time = int(time.time())
     LOG.info("before_time " + str(before_time) + " after_time " + 
str(after_time))
 
     # Stop the cluster in order to flush the lineage log files.
     self._stop_impala_cluster()
 
-    for log_filename in os.listdir(self.START_END_TIME_LINEAGE_LOG_DIR):
-      log_path = os.path.join(self.START_END_TIME_LINEAGE_LOG_DIR, 
log_filename)
+    for log_filename in os.listdir(self.get_tmp_dir(self.START_END_TIME)):
+      log_path = os.path.join(self.get_tmp_dir(self.START_END_TIME), 
log_filename)
       # Only the coordinator's log file will be populated.
       if os.path.getsize(log_path) > 0:
         LOG.info("examining file: " + log_path)
@@ -80,22 +85,25 @@ class TestLineage(CustomClusterTestSuite):
         LOG.info("empty file: " + log_path)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
-                                    .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR))
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--lineage_event_log_dir={" + CREATE_TABLE_TIME + "}",
+      tmp_dir_placeholders=[CREATE_TABLE_TIME])
   def test_create_table_timestamp(self, unique_database):
     for table_format in ['textfile', 'kudu', 'iceberg']:
-      self.run_test_create_table_timestamp(unique_database, table_format)
+      self.run_test_create_table_timestamp(
+        unique_database, table_format, self.CREATE_TABLE_TIME)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      "--lineage_event_log_dir={0}"
-      .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR),
-      catalogd_args="--hms_event_polling_interval_s=0")
+      impalad_args="--lineage_event_log_dir={" + CREATE_TABLE_TIME_NO_HMS + 
"}",
+      catalogd_args="--hms_event_polling_interval_s=0",
+      tmp_dir_placeholders=[CREATE_TABLE_TIME_NO_HMS])
   def test_create_table_timestamp_without_hms_events(self, unique_database):
     for table_format in ['textfile', 'kudu', 'iceberg']:
-      self.run_test_create_table_timestamp(unique_database, table_format)
+      self.run_test_create_table_timestamp(
+        unique_database, table_format, self.CREATE_TABLE_TIME_NO_HMS)
 
-  def run_test_create_table_timestamp(self, unique_database, table_format):
+  def run_test_create_table_timestamp(self, unique_database, table_format, 
tmp_dir_id):
     """Test that 'createTableTime' in the lineage graph are populated with 
valid value
        from HMS."""
     not_enforced = ""
@@ -105,13 +113,13 @@ class TestLineage(CustomClusterTestSuite):
         "stored as {1} as select int_col, bigint_col from 
functional.alltypes".format(
             unique_database, table_format, not_enforced)
     result = self.execute_query_expect_success(self.client, query)
-    profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
+    profile_query_id = re.search(r"Query \(id=(.*)\):", 
result.runtime_profile).group(1)
 
     # Wait to flush the lineage log files.
     time.sleep(3)
 
-    for log_filename in os.listdir(self.CREATE_TABLE_TIME_LINEAGE_LOG_DIR):
-      log_path = os.path.join(self.CREATE_TABLE_TIME_LINEAGE_LOG_DIR, 
log_filename)
+    for log_filename in os.listdir(self.get_tmp_dir(tmp_dir_id)):
+      log_path = os.path.join(self.get_tmp_dir(tmp_dir_id), log_filename)
       # Only the coordinator's log file will be populated.
       if os.path.getsize(log_path) > 0:
         with open(log_path) as log_file:
@@ -134,20 +142,21 @@ class TestLineage(CustomClusterTestSuite):
                 assert table_create_time != -1
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
-                                    .format(DDL_LINEAGE_LOG_DIR))
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--lineage_event_log_dir={" + DDL_LINEAGE + "}",
+      tmp_dir_placeholders=[DDL_LINEAGE])
   def test_ddl_lineage(self, unique_database):
     """ Test that DDLs like 'create table' have query text populated in the 
lineage
     graph."""
     query = "create external table {0}.ddl_lineage_tbl (id 
int)".format(unique_database)
     result = self.execute_query_expect_success(self.client, query)
-    profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
+    profile_query_id = re.search(r"Query \(id=(.*)\):", 
result.runtime_profile).group(1)
 
     # Wait to flush the lineage log files.
     time.sleep(3)
 
-    for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
-      log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+    for log_filename in os.listdir(self.get_tmp_dir(self.DDL_LINEAGE)):
+      log_path = os.path.join(self.get_tmp_dir(self.DDL_LINEAGE), log_filename)
       # Only the coordinator's log file will be populated.
       if os.path.getsize(log_path) > 0:
         with open(log_path) as log_file:
@@ -161,13 +170,13 @@ class TestLineage(CustomClusterTestSuite):
     query = "explain create table {0}.lineage_test_tbl as select int_col, " \
             "tinyint_col from functional.alltypes".format(unique_database)
     result = self.execute_query_expect_success(self.client, query)
-    profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
+    profile_query_id = re.search(r"Query \(id=(.*)\):", 
result.runtime_profile).group(1)
 
     # Wait to flush the lineage log files.
     time.sleep(3)
 
-    for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
-      log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+    for log_filename in os.listdir(self.get_tmp_dir(self.DDL_LINEAGE)):
+      log_path = os.path.join(self.get_tmp_dir(self.DDL_LINEAGE), log_filename)
       # Only the coordinator's log file will be populated.
       if os.path.getsize(log_path) > 0:
         with open(log_path) as log_file:
@@ -176,8 +185,9 @@ class TestLineage(CustomClusterTestSuite):
 
   @SkipIfFS.hbase
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
-                                    .format(LINEAGE_TESTS_DIR))
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--lineage_event_log_dir={" + LINEAGE + "}",
+      tmp_dir_placeholders=[LINEAGE])
   def test_lineage_output(self, vector):
     try:
       self.run_test_case('QueryTest/lineage', vector)
diff --git a/tests/custom_cluster/test_permanent_udfs.py 
b/tests/custom_cluster/test_permanent_udfs.py
index c84ac6667..bd9428f76 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -20,14 +20,13 @@ import glob
 import os
 import pytest
 import re
-import shutil
 
-from tempfile import mkdtemp
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfFS
 from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.util.filesystem_utils import get_fs_path
 
+
 class TestUdfPersistence(CustomClusterTestSuite):
   """ Tests the behavior of UDFs and UDAs between catalog restarts. With 
IMPALA-1748,
   these functions are persisted to the metastore and are loaded again during 
catalog
@@ -36,9 +35,9 @@ class TestUdfPersistence(CustomClusterTestSuite):
   DATABASE = 'udf_permanent_test'
   JAVA_FN_TEST_DB = 'java_permanent_test'
   HIVE_IMPALA_INTEGRATION_DB = 'hive_impala_integration_db'
-  HIVE_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/hive-exec.jar';
-  JAVA_UDF_JAR = os.getenv('DEFAULT_FS') + 
'/test-warehouse/impala-hive-udfs.jar';
-  LOCAL_LIBRARY_DIR = mkdtemp(dir="/tmp")
+  HIVE_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/hive-exec.jar'
+  JAVA_UDF_JAR = os.getenv('DEFAULT_FS') + 
'/test-warehouse/impala-hive-udfs.jar'
+  LOCAL_LIBRARY_DIR = "udf_persistence"
 
   @classmethod
   def get_workload(cls):
@@ -83,13 +82,13 @@ class TestUdfPersistence(CustomClusterTestSuite):
 
   def teardown_method(self, method):
     self.__cleanup()
+    self.clear_tmp_dirs()
 
   def __cleanup(self):
     self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.DATABASE)
     self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % 
self.JAVA_FN_TEST_DB)
     self.client.execute("DROP DATABASE IF EXISTS %s CASCADE"
        % self.HIVE_IMPALA_INTEGRATION_DB)
-    shutil.rmtree(self.LOCAL_LIBRARY_DIR, ignore_errors=True)
 
   def __load_drop_functions(self, template, database, location):
     queries = template.format(database=database, location=location)
@@ -114,20 +113,20 @@ class TestUdfPersistence(CustomClusterTestSuite):
     # Make sure the pre-calculated count tallies with the number of
     # functions shown using "show [aggregate] functions" statement
     self.verify_function_count(
-            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
+            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count)
     self.verify_function_count(
             "SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 
self.uda_count)
     # invalidate metadata and make sure the count tallies
-    result = self.client.execute("INVALIDATE METADATA")
+    self.client.execute("INVALIDATE METADATA")
     self.verify_function_count(
-            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
+            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count)
     self.verify_function_count(
             "SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 
self.uda_count)
     # Restart the cluster, this triggers a full metadata reload
     self.__restart_cluster()
     # Make sure the counts of udfs and udas match post restart
     self.verify_function_count(
-            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
+            "SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count)
     self.verify_function_count(
             "SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 
self.uda_count)
     # Drop sample udas and verify the count matches pre and post restart
@@ -140,7 +139,6 @@ class TestUdfPersistence(CustomClusterTestSuite):
     self.verify_function_count(
             "SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 1)
 
-
   def __verify_udf_in_hive(self, udf):
     (query, result) = self.SAMPLE_JAVA_UDFS_TEST[udf]
     stdout = self.run_stmt_in_hive("select " + query.format(
@@ -208,7 +206,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
   @SkipIfFS.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
+      catalogd_args="--local_library_dir={" + LOCAL_LIBRARY_DIR + "}",
+      tmp_dir_placeholders=[LOCAL_LIBRARY_DIR])
   def test_java_udfs_hive_integration(self):
     ''' This test checks the integration between Hive and Impala on
     CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs.
@@ -264,12 +263,13 @@ class TestUdfPersistence(CustomClusterTestSuite):
       self.verify_function_count(
           "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0)
       # Make sure we deleted all the temporary jars we copied to the local fs
-      assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
+      assert len(glob.glob(self.get_tmp_dir(self.LOCAL_LIBRARY_DIR) + 
"/*.jar")) == 0
 
   @SkipIfFS.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
+      catalogd_args="--local_library_dir={" + LOCAL_LIBRARY_DIR + "}",
+      tmp_dir_placeholders=[LOCAL_LIBRARY_DIR])
   def test_refresh_native(self):
     ''' This test checks that a native function is visible in Impala after a
     REFRESH FUNCTIONS command. We will add the native function through Hive
@@ -323,12 +323,13 @@ class TestUdfPersistence(CustomClusterTestSuite):
         database=self.HIVE_IMPALA_INTEGRATION_DB))
     assert result.data[0] == "10"
     # Make sure we deleted all the temporary jars we copied to the local fs
-    assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
+    assert len(glob.glob(self.get_tmp_dir(self.LOCAL_LIBRARY_DIR) + "/*.jar")) 
== 0
 
   @SkipIfFS.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
+      catalogd_args="--local_library_dir={" + LOCAL_LIBRARY_DIR + "}",
+      tmp_dir_placeholders=[LOCAL_LIBRARY_DIR])
   def test_refresh_replace(self):
     ''' This test checks that if we drop a function and then create a
     different function with the same name in Hive, the new function will
@@ -346,8 +347,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
         database=self.HIVE_IMPALA_INTEGRATION_DB))
     result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
         database=self.HIVE_IMPALA_INTEGRATION_DB))
-    assert (result is not None and len(result.data) == 3 and
-        "test_func" in str(result.data))
+    assert (result is not None and len(result.data) == 3
+            and "test_func" in str(result.data))
     result = self.client.execute("SELECT {database}.test_func(123)".format(
         database=self.HIVE_IMPALA_INTEGRATION_DB))
     assert result.data[0] == "7B"
@@ -365,35 +366,36 @@ class TestUdfPersistence(CustomClusterTestSuite):
         database=self.HIVE_IMPALA_INTEGRATION_DB))
     result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
         database=self.HIVE_IMPALA_INTEGRATION_DB))
-    assert (result is not None and len(result.data) == 1 and
-        "test_func" in str(result.data))
+    assert (result is not None and len(result.data) == 1
+            and "test_func" in str(result.data))
     # Verify that the function has actually been updated.
     result = self.client.execute("SELECT {database}.test_func(123)".format(
         database=self.HIVE_IMPALA_INTEGRATION_DB))
     assert result.data[0] == "1111011"
     # Make sure we deleted all the temporary jars we copied to the local fs
-    assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
+    assert len(glob.glob(self.get_tmp_dir(self.LOCAL_LIBRARY_DIR) + "/*.jar")) 
== 0
 
   @pytest.mark.execute_serially
   def test_java_udfs_from_impala(self):
     """ This tests checks the behavior of permanent Java UDFs in Impala."""
     self.verify_function_count(
-            "SHOW FUNCTIONS in {0}".format(self.JAVA_FN_TEST_DB), 0);
+            "SHOW FUNCTIONS in {0}".format(self.JAVA_FN_TEST_DB), 0)
     # Create a non persistent Java UDF and make sure we can't create a
     # persistent Java UDF with same name
-    self.client.execute("create function %s.%s(boolean) returns boolean "\
-        "location '%s' symbol='%s'" % (self.JAVA_FN_TEST_DB, "identity",
-        self.JAVA_UDF_JAR, "org.apache.impala.TestUdf"))
+    self.client.execute("create function %s.%s(boolean) returns boolean "
+        "location '%s' symbol='%s'" % (
+          self.JAVA_FN_TEST_DB, "identity", self.JAVA_UDF_JAR,
+          "org.apache.impala.TestUdf"))
     result = self.execute_query_expect_failure(self.client,
         self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB,
         function="identity", location=self.JAVA_UDF_JAR,
         symbol="org.apache.impala.TestUdf"))
     assert "Function already exists" in str(result)
     # Test the same with a NATIVE function
-    self.client.execute("create function {database}.identity(int) "\
+    self.client.execute("create function {database}.identity(int) "
         "returns int location '{location}' symbol='Identity'".format(
-        database=self.JAVA_FN_TEST_DB,
-        location="/test-warehouse/libTestUdfs.so"))
+          database=self.JAVA_FN_TEST_DB,
+          location="/test-warehouse/libTestUdfs.so"))
     result = self.execute_query_expect_failure(self.client,
         self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB,
         function="identity", location=self.JAVA_UDF_JAR,
@@ -405,19 +407,19 @@ class TestUdfPersistence(CustomClusterTestSuite):
     self.client.execute(self.CREATE_JAVA_UDF_TEMPLATE.format(
         db=self.JAVA_FN_TEST_DB, function="identity_java",
         location=self.JAVA_UDF_JAR, symbol="org.apache.impala.TestUdf"))
-    result = self.execute_query_expect_failure(self.client, "create function "\
+    result = self.execute_query_expect_failure(self.client, "create function "
         "%s.%s(boolean) returns boolean location '%s' symbol='%s'" % (
-        self.JAVA_FN_TEST_DB, "identity_java", self.JAVA_UDF_JAR,
-        "org.apache.impala.TestUdf"))
+          self.JAVA_FN_TEST_DB, "identity_java", self.JAVA_UDF_JAR,
+          "org.apache.impala.TestUdf"))
     assert "Function already exists" in str(result)
-    result = self.execute_query_expect_failure(self.client, "create function "\
-        "{database}.identity_java(int) returns int location '{location}' "\
+    result = self.execute_query_expect_failure(self.client, "create function "
+        "{database}.identity_java(int) returns int location '{location}' "
         "symbol='Identity'".format(database=self.JAVA_FN_TEST_DB,
         location="/test-warehouse/libTestUdfs.so"))
     assert "Function already exists" in str(result)
     # With IF NOT EXISTS, the query shouldn't fail.
-    result = self.execute_query_expect_success(self.client, "create function "\
-        " if not exists {database}.identity_java(int) returns int location "\
+    result = self.execute_query_expect_success(self.client, "create function "
+        " if not exists {database}.identity_java(int) returns int location "
         "'{location}' symbol='Identity'".format(database=self.JAVA_FN_TEST_DB,
         location="/test-warehouse/libTestUdfs.so"))
     result = self.client.execute("SHOW FUNCTIONS in %s" % self.JAVA_FN_TEST_DB)
@@ -446,7 +448,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
     # this case, identity(boolean) should be wiped out.
     self.__restart_cluster()
     self.verify_function_count(
-        "SHOW FUNCTIONS IN %s" % self.JAVA_FN_TEST_DB, function_count-1)
+        "SHOW FUNCTIONS IN %s" % self.JAVA_FN_TEST_DB, function_count - 1)
     # Dropping persisted Java UDFs with old syntax should raise an exception
     self.execute_query_expect_failure(self.client,
         "DROP FUNCTION compatibility(smallint)")
@@ -516,7 +518,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
       ('udfconv', 'org.apache.hadoop.hive.ql.udf.UDFConv'),
       ('udflike', 'org.apache.hadoop.hive.ql.udf.UDFLike'),
       ('udfsign', 'org.apache.hadoop.hive.ql.udf.UDFSign'),
-      ('udfascii','org.apache.hadoop.hive.ql.udf.UDFAscii')
+      ('udfascii', 'org.apache.hadoop.hive.ql.udf.UDFAscii')
   ]
 
   # These UDFs are available in Hive 2 but in Hive 3 are now implemented
@@ -528,15 +530,15 @@ class TestUdfPersistence(CustomClusterTestSuite):
 
   # Simple tests to verify java udfs in SAMPLE_JAVA_UDFS
   SAMPLE_JAVA_UDFS_TEST = {
-    'udfpi' : ('{db}.udfpi()', '3.141592653589793'),
-    'udfbin' : ('{db}.udfbin(123)', '1111011'),
-    'udfhex' : ('{db}.udfhex(123)', '7B'),
-    'udfconv'  : ('{db}.udfconv("100", 2, 10)', '4'),
-    'udfhour'  : ('{db}.udfhour("12:55:12")', '12'),
-    'udflike'  : ('{db}.udflike("abc", "def")', 'false'),
-    'udfsign'  : ('{db}.udfsign(0)', '0'),
-    'udfyear' : ('{db}.udfyear("1990-02-06")', '1990'),
-    'udfascii' : ('{db}.udfascii("abc")','97')
+    'udfpi': ('{db}.udfpi()', '3.141592653589793'),
+    'udfbin': ('{db}.udfbin(123)', '1111011'),
+    'udfhex': ('{db}.udfhex(123)', '7B'),
+    'udfconv': ('{db}.udfconv("100", 2, 10)', '4'),
+    'udfhour': ('{db}.udfhour("12:55:12")', '12'),
+    'udflike': ('{db}.udflike("abc", "def")', 'false'),
+    'udfsign': ('{db}.udfsign(0)', '0'),
+    'udfyear': ('{db}.udfyear("1990-02-06")', '1990'),
+    'udfascii': ('{db}.udfascii("abc")', '97')
   }
 
   CREATE_SAMPLE_UDAS_TEMPLATE = """
@@ -697,7 +699,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
     location '{location}' symbol='Count' prepare_fn='CountPrepare' 
close_fn='CountClose';
 
     create function {database}.constant_arg(int) returns int
-    location '{location}' symbol='ConstantArg' prepare_fn='ConstantArgPrepare' 
close_fn='ConstantArgClose';
+    location '{location}' symbol='ConstantArg' prepare_fn='ConstantArgPrepare'
+    close_fn='ConstantArgClose';
 
     create function {database}.validate_open(int) returns boolean
     location '{location}' symbol='ValidateOpen'
diff --git a/tests/custom_cluster/test_query_event_hooks.py 
b/tests/custom_cluster/test_query_event_hooks.py
index e709f83d3..3be2ba353 100644
--- a/tests/custom_cluster/test_query_event_hooks.py
+++ b/tests/custom_cluster/test_query_event_hooks.py
@@ -18,9 +18,7 @@
 # Client tests for Query Event Hooks
 
 from __future__ import absolute_import, division, print_function
-import os
 import pytest
-import tempfile
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
@@ -30,16 +28,15 @@ class TestHooks(CustomClusterTestSuite):
   Tests for FE QueryEventHook invocations.
   """
   DUMMY_HOOK = "org.apache.impala.testutil.DummyQueryEventHook"
-  MINIDUMP_PATH = tempfile.mkdtemp()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impala_log_dir=tempfile.mkdtemp(prefix="test_hooks_", 
dir=os.getenv("LOG_DIR")),
-      impalad_args="--query_event_hook_classes={0} "
-                   "--minidump_path={1} -logbuflevel=-1"
-                   .format(DUMMY_HOOK, MINIDUMP_PATH),
-      catalogd_args="--minidump_path={0}".format(MINIDUMP_PATH))
-  def test_query_event_hooks_execute(self, unique_database):
+      impala_log_dir="{query_event_hooks_log}",
+      impalad_args=("--query_event_hook_classes={0} -logbuflevel=-1 
".format(DUMMY_HOOK)
+                    + "--minidump_path={query_event_hooks_minidump}"),
+      catalogd_args="--minidump_path={query_event_hooks_minidump}",
+      tmp_dir_placeholders=['query_event_hooks_log', 
'query_event_hooks_minidump'])
+  def test_query_event_hooks_execute(self):
     """
     Tests that the post query execution hook actually executes by using a
     dummy hook implementation.
@@ -74,20 +71,17 @@ class TestHooksStartupFail(CustomClusterTestSuite):
 
   FAILING_HOOK = "org.apache.impala.testutil.AlwaysErrorQueryEventHook"
   NONEXIST_HOOK = "captain.hook"
-  LOG_DIR1 = tempfile.mkdtemp(prefix="test_hooks_startup_fail_", 
dir=os.getenv("LOG_DIR"))
-  LOG_DIR2 = tempfile.mkdtemp(prefix="test_hooks_startup_fail_", 
dir=os.getenv("LOG_DIR"))
-  MINIDUMP_PATH = tempfile.mkdtemp()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       expect_cores=True,
       cluster_size=1,
       impalad_timeout_s=5,
-      impala_log_dir=LOG_DIR1,
-      impalad_args="--query_event_hook_classes={0} "
-                   "--minidump_path={1}"
-                   .format(FAILING_HOOK, MINIDUMP_PATH),
-      catalogd_args="--minidump_path={0}".format(MINIDUMP_PATH))
+      impala_log_dir="{hook_startup_fail_log}",
+      impalad_args=("--query_event_hook_classes={0} ".format(FAILING_HOOK)
+                    + "--minidump_path={hook_startup_fail_minidump}"),
+      catalogd_args="--minidump_path={hook_startup_fail_minidump}",
+      tmp_dir_placeholders=['hook_startup_fail_log', 
'hook_startup_fail_minidump'])
   def test_hook_startup_fail(self):
     """
     Tests that exception during QueryEventHook.onImpalaStart will prevent
@@ -104,11 +98,12 @@ class TestHooksStartupFail(CustomClusterTestSuite):
       expect_cores=True,
       cluster_size=1,
       impalad_timeout_s=5,
-      impala_log_dir=LOG_DIR2,
-      impalad_args="--query_event_hook_classes={0} "
-                   "--minidump_path={1}"
-                   .format(NONEXIST_HOOK, MINIDUMP_PATH),
-      catalogd_args="--minidump_path={0}".format(MINIDUMP_PATH))
+      impala_log_dir="{hook_instantiation_fail_log}",
+      impalad_args=("--query_event_hook_classes={0} ".format(NONEXIST_HOOK)
+                    + "--minidump_path={hook_instantiation_fail_minidump}"),
+      catalogd_args="--minidump_path={hook_instantiation_fail_minidump}",
+      tmp_dir_placeholders=['hook_instantiation_fail_log',
+                            'hook_instantiation_fail_minidump'])
   def test_hook_instantiation_fail(self):
     """
     Tests that failure to instantiate a QueryEventHook will prevent
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 8f829b289..28a64e979 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -19,7 +19,6 @@ from __future__ import absolute_import, division, 
print_function
 
 import os
 import string
-import tempfile
 
 from getpass import getuser
 from ImpalaService import ImpalaHiveServer2Service
@@ -86,9 +85,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   def get_workload(self):
     return 'functional-query'
 
-  CACHE_DIR = tempfile.mkdtemp(prefix="cache_dir")
   MAX_SQL_PLAN_LEN = 2000
-  LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
+  LOG_DIR_MAX_WRITES = 'max_attempts_exceeded'
   FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + 
str(int(time()))
   FLUSH_MAX_RECORDS_QUERY_COUNT = 30
 
@@ -211,10 +209,11 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_2 "
                                                  "--always_use_data_cache "
-                                                 
"--data_cache={0}:5GB".format(CACHE_DIR),
+                                                 
"--data_cache={query_data_cache}:5GB",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
-                                    cluster_size=1)
+                                    cluster_size=1,
+                                    tmp_dir_placeholders=['query_data_cache'])
   def test_query_data_cache(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. Specifically focuses on the data cache metrics."""
@@ -253,16 +252,18 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=5",
-                                    impala_log_dir=LOG_DIR_MAX_WRITES,
+                                    impala_log_dir=("{" + LOG_DIR_MAX_WRITES + 
"}"),
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    tmp_dir_placeholders=[LOG_DIR_MAX_WRITES])
   def test_max_attempts_exceeded(self, vector):
     """Asserts that completed queries are only attempted 3 times to be 
inserted into the
        completed queries table. This test deletes the completed queries table 
thus it must
        not come last otherwise the table stays deleted. Subsequent tests will 
re-create
        the table."""
 
-    print("USING LOG DIRECTORY: {0}".format(self.LOG_DIR_MAX_WRITES))
+    log_dir = self.get_tmp_dir(self.LOG_DIR_MAX_WRITES)
+    print("USING LOG DIRECTORY: {0}".format(log_dir))
 
     impalad = self.cluster.get_first_impalad()
     client = self.get_client(vector.get_value('protocol'))
@@ -279,7 +280,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     # Allow time for logs to be written to disk.
     sleep(5)
 
-    with open(os.path.join(self.LOG_DIR_MAX_WRITES, "impalad.ERROR")) as file:
+    with open(os.path.join(log_dir, "impalad.ERROR")) as file:
       for line in file:
         if line.find('could not write completed query table="{0}" 
query_id="{1}"'
                           .format(self.QUERY_TBL, res.query_id)) >= 0:
@@ -1037,8 +1038,6 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
 class TestQueryLogTableBufferPool(TestQueryLogTableBase):
   """Base class for all query log tests that set the buffer pool query 
option."""
 
-  SCRATCH_DIR = tempfile.mkdtemp(prefix="scratch_dir")
-
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryLogTableBufferPool, cls).add_test_dimensions()
@@ -1048,10 +1047,10 @@ class 
TestQueryLogTableBufferPool(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_1 "
-                                                 "--scratch_dirs={0}:5G"
-                                                 .format(SCRATCH_DIR),
+                                                 
"--scratch_dirs={scratch_dir}:5G",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    tmp_dir_placeholders=['scratch_dir'])
   def test_select(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. If the buffer_pool_limit parameter is not None, then 
this test
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 8d0629f0c..786ad23e7 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -25,8 +25,6 @@ from __future__ import absolute_import, division, 
print_function
 from builtins import map, range
 import pytest
 import re
-import shutil
-import tempfile
 import time
 
 from random import randint
@@ -38,6 +36,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 from tests.common.errors import Timeout
 from tests.common.skip import (SkipIfEC, SkipIfBuildType, SkipIfFS,
     SkipIfNotHdfsMinicluster)
+from tests.common.test_dimensions import add_mandatory_exec_option
 
 # The BE krpc port of the impalad to simulate rpc or disk errors in tests.
 FAILED_KRPC_PORT = 27001
@@ -621,7 +620,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # PLAN_ROOT_SINK's reservation limit should be set at
     # 2 * DEFAULT_SPILLABLE_BUFFER_SIZE = 16 KB.
-    plan_root_sink_reservation_limit = 
"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 16.00 KB"
+    plan_root_sink_reservation_limit = 
r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 16.00 KB"
     profile = self.client.get_runtime_profile(handle)
     assert re.search(plan_root_sink_reservation_limit, profile)
 
@@ -1026,7 +1025,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   def __get_query_id_from_profile(self, profile):
     """Extracts and returns the query id of the given profile."""
-    query_id_search = re.search("Query \(id=(.*)\)", profile)
+    query_id_search = re.search(r"Query \(id=(.*)\)", profile)
     assert query_id_search, "Invalid query profile, has no query 
id:\n{0}".format(
         profile)
     return query_id_search.group(1)
@@ -1077,7 +1076,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   def __get_query_options(self, profile):
     """Returns the query options from the given profile."""
-    query_options_pattern = "Query Options \(set by configuration and 
planner\): (.*)"
+    query_options_pattern = r"Query Options \(set by configuration and 
planner\): (.*)"
     query_options = re.search(query_options_pattern, profile)
     assert query_options, profile
     return query_options.group(1)
@@ -1165,7 +1164,7 @@ class TestQueryRetries(CustomClusterTestSuite):
   def __exist_queries_in_web_ui_memz(self):
     memz_breakdown = self.cluster.get_first_impalad() \
       .service.get_debug_webpage_json('memz')['detailed']
-    query = re.compile("Query\([0-9a-f]{16}:[0-9a-f]{16}")
+    query = re.compile(r"Query\([0-9a-f]{16}:[0-9a-f]{16}")
     return query.search(memz_breakdown)
 
   def __validate_memz(self):
@@ -1190,33 +1189,40 @@ class 
TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestQueryRetriesFaultyDisk, cls).setup_class()
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestQueryRetriesFaultyDisk, cls).add_test_dimensions()
+    # Buffer pool limit that is low enough to force Impala to spill to disk 
when
+    # executing spill_query.
+    add_mandatory_exec_option(cls, 'buffer_pool_limit', '45m')
+    add_mandatory_exec_option(cls, 'retry_failed_queries', True)
+    # Set debug_action to inject disk write error for spill-to-disk on impalad 
for
+    # which krpc port is 27001.
+    add_mandatory_exec_option(
+      cls, 'debug_action', _get_disk_fail_action(FAILED_KRPC_PORT))
+
   # Query with order by requires spill to disk if intermediate results don't 
fit in mem
   spill_query = """
       select o_orderdate, o_custkey, o_comment
       from tpch.orders
       order by o_orderdate
       """
-  # Buffer pool limit that is low enough to force Impala to spill to disk when 
executing
-  # spill_query.
-  buffer_pool_limit = "45m"
 
   def setup_method(self, method):
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this class, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
-    self.created_dirs = []
+    pass
 
   def teardown_method(self, method):
-    for dir_path in self.created_dirs:
-      shutil.rmtree(dir_path, ignore_errors=True)
+    self.clear_tmp_dirs()
 
   def __generate_scratch_dir(self, num):
     result = []
     for i in range(num):
-      dir_path = tempfile.mkdtemp()
-      self.created_dirs.append(dir_path)
+      dir_path = self.make_tmp_dir('scratch_dir_{}'.format(i))
       result.append(dir_path)
-      print("Generated dir" + dir_path)
+      print("Generated dir " + dir_path)
     return result
 
   def __validate_web_ui_state(self):
@@ -1250,12 +1256,6 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
         expected_count=1)
 
-    # Set debug_action to inject disk write error for spill-to-disk on impalad 
for which
-    # krpc port is 27001.
-    vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
-    vector.get_value('exec_option')['debug_action'] = \
-        _get_disk_fail_action(FAILED_KRPC_PORT)
-    vector.get_value('exec_option')['retry_failed_queries'] = "true"
     coord_impalad = self.cluster.get_first_impalad()
     client = coord_impalad.service.create_beeswax_client()
 
diff --git a/tests/custom_cluster/test_redaction.py 
b/tests/custom_cluster/test_redaction.py
index 607502723..6caa979eb 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -21,10 +21,8 @@ import logging
 import os
 import pytest
 import re
-import shutil
 import unittest
 
-from tempfile import mkdtemp
 from time import sleep
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -33,6 +31,7 @@ from tests.common.file_utils import grep_file, 
assert_file_in_dir_contains,\
 
 LOG = logging.getLogger(__name__)
 
+
 # This class needs to inherit from unittest.TestCase otherwise py.test will 
ignore it
 # because a __init__ method is used.
 class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
@@ -75,13 +74,13 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     pass
 
   def teardown_method(self, method):
-    # Parent method would fail, nothing needs to be done. The tests are 
responsible
-    # for deleting self.tmp_dir after tests pass.
-    pass
+    # Parent method would fail, nothing needs to be done.
+    # Cleanup any temporary dirs.
+    self.clear_tmp_dirs()
 
   def start_cluster_using_rules(self, redaction_rules, log_level=2, 
vmodule=""):
     '''Start Impala with a custom log dir and redaction rules.'''
-    self.tmp_dir = mkdtemp(prefix="test_redaction_", dir=os.getenv("LOG_DIR"))
+    self.tmp_dir = self.make_tmp_dir('redaction')
     os.chmod(self.tmp_dir, 0o777)
     LOG.info("tmp_dir is " + self.tmp_dir)
     os.mkdir(self.log_dir)
@@ -176,8 +175,6 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     startup_options = dict()
     self.assert_server_fails_to_start('{ "version": 100 }', startup_options,
         'Error parsing redaction rules; only version 1 is supported')
-    # Since the tests passed, the log dir shouldn't be of interest and can be 
deleted.
-    shutil.rmtree(self.tmp_dir)
 
   @pytest.mark.execute_serially
   def test_very_verbose_logging(self):
@@ -202,15 +199,9 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     error_message = "Redaction cannot be used in combination with log level 3 
or " \
         "higher or the -vmodule option"
     self.assert_server_fails_to_start(rules, {"log_level": 3}, error_message)
-    # Since the tests passed, the log dir shouldn't be of interest and can be 
deleted.
-    shutil.rmtree(self.tmp_dir)
-
     self.assert_server_fails_to_start(rules, {"vmodule": "foo"}, error_message)
-    shutil.rmtree(self.tmp_dir)
-
     self.assert_server_fails_to_start(
         rules, {"log_level": 3, "vmodule": "foo"}, error_message)
-    shutil.rmtree(self.tmp_dir)
 
   @pytest.mark.execute_serially
   def test_unredacted(self):
@@ -237,10 +228,6 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     # The profile is encoded so the email won't be found.
     assert_no_files_in_dir_contain(self.profile_dir, email)
 
-    # Since all the tests passed, the log dir shouldn't be of interest and can 
be
-    # deleted.
-    shutil.rmtree(self.tmp_dir)
-
   @pytest.mark.execute_serially
   def test_redacted(self):
     '''Check that redaction rules prevent 'sensitive' data from leaking into 
the
@@ -283,7 +270,7 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     self.assert_query_profile_contains(self.find_last_query_id(), 
user_profile_pattern)
     # Wait for the logs to be written.
     sleep(5)
-    self.assert_log_redaction(email, "\*email\*")
+    self.assert_log_redaction(email, r"\*email\*")
 
     # Even if the query is invalid, redaction should still be applied.
     credit_card = '1234-5678-1234-5678'
@@ -295,13 +282,9 @@ class TestRedaction(CustomClusterTestSuite, 
unittest.TestCase):
     self.assert_query_profile_contains(self.find_last_query_id(), 
user_profile_pattern)
     sleep(5)
     # Apparently an invalid query doesn't generate an audit log entry.
-    self.assert_log_redaction(credit_card, "\*credit card\*", 
expect_audit=False)
+    self.assert_log_redaction(credit_card, r"\*credit card\*", 
expect_audit=False)
 
     # Assert that the username in the query stmt is redacted but not from the 
user fields.
     self.execute_query_expect_success(self.client, query_template % 
current_user)
     self.assert_query_profile_contains(self.find_last_query_id(), 
user_profile_pattern)
     self.assert_query_profile_contains(self.find_last_query_id(), "redacted 
user")
-
-    # Since all the tests passed, the log dir shouldn't be of interest and can 
be
-    # deleted.
-    shutil.rmtree(self.tmp_dir)
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index bc08c647b..525a0d9cb 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -22,10 +22,8 @@ from builtins import range
 import os
 import pytest
 import re
-import shutil
 import stat
 import subprocess
-import tempfile
 import time
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -33,6 +31,7 @@ from tests.common.skip import SkipIf
 from tests.util.hdfs_util import NAMENODE
 from tests.verifiers.metric_verifier import MetricVerifier
 
+
 class TestScratchDir(CustomClusterTestSuite):
   @classmethod
   def get_workload(cls):
@@ -75,29 +74,30 @@ class TestScratchDir(CustomClusterTestSuite):
   def get_dirs(dirs):
     return ','.join(dirs)
 
-  def generate_dirs(self, num, writable=True, non_existing=False):
+  def get_name_for_dir(self, prefix, num):
+    return '{}_{}'.format(prefix, num)
+
+  def generate_dirs(self, prefix, num, writable=True, non_existing=False):
     result = []
     for i in range(num):
-      dir_path = tempfile.mkdtemp()
+      name = self.get_name_for_dir(prefix, i)
+      dir_path = self.make_tmp_dir(name)
+      result.append(dir_path)
+      print("Generated dir " + dir_path)
       if non_existing:
-        shutil.rmtree(dir_path)
+        self.clear_tmp_dir(name)
       elif not writable:
         os.chmod(dir_path, stat.S_IREAD)
-      if not non_existing:
-        self.created_dirs.append(dir_path)
-      result.append(dir_path)
-      print("Generated dir" + dir_path)
     return result
 
   def setup_method(self, method):
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this file, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
-    self.created_dirs = []
+    pass
 
   def teardown_method(self, method):
-    for dir_path in self.created_dirs:
-      shutil.rmtree(dir_path, ignore_errors=True)
+    self.clear_tmp_dirs()
     self.check_deleted_file_fd()
 
   @pytest.mark.execute_serially
@@ -105,7 +105,7 @@ class TestScratchDir(CustomClusterTestSuite):
     """ 5 empty directories are created in the /tmp directory and we verify 
that only
         one of those directories is used as scratch disk. Only one should be 
used as
         scratch because all directories are on same disk."""
-    normal_dirs = self.generate_dirs(5)
+    normal_dirs = self.generate_dirs('normal_dirs', 5)
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=false',
@@ -125,7 +125,7 @@ class TestScratchDir(CustomClusterTestSuite):
     """ Test we can execute a query with no scratch dirs """
     self._start_impala_cluster(['--impalad_args=-logbuflevel=-1 
-scratch_dirs='])
     self.assert_impalad_log_contains("WARNING",
-        "Running without spill to disk: no scratch directories provided\.")
+        r"Running without spill to disk: no scratch directories provided\.")
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
     impalad = self.cluster.get_any_impalad()
@@ -143,15 +143,15 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_non_writable_dirs(self, vector):
     """ Test we can execute a query with only bad non-writable scratch """
-    non_writable_dirs = self.generate_dirs(5, writable=False)
-    self._start_impala_cluster([
-      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
-      ','.join(non_writable_dirs))])
+    non_writable_dirs = self.generate_dirs('non_writable_dirs', 5, 
writable=False)
+    self._start_impala_cluster(
+      ['--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
+       ','.join(non_writable_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: 
could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
     self.assert_impalad_log_contains("WARNING", "Could not remove and recreate 
directory "
-            + ".*: cannot use it for scratch\. Error was: .*", 
expected_count=5)
+            + r".*: cannot use it for scratch\. Error was: .*", 
expected_count=5)
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
     # IMPALA-9856: Disable query result spooling so that in_mem_query does not 
spill to
@@ -168,10 +168,10 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_non_existing_dirs(self, vector):
     """ Test that non-existing directories are not created or used """
-    non_existing_dirs = self.generate_dirs(5, non_existing=True)
-    self._start_impala_cluster([
-      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
-      ','.join(non_existing_dirs))])
+    non_existing_dirs = self.generate_dirs('non_existing_dirs', 5, 
non_existing=True)
+    self._start_impala_cluster(
+      ['--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(
+       ','.join(non_existing_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: 
could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -195,7 +195,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def test_write_error_failover(self, vector):
     """ Test that we can fail-over to writable directories if other directories
         have permissions changed or are removed after impalad startup."""
-    dirs = self.generate_dirs(3)
+    dirs = self.generate_dirs('write_error_failover', 3)
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
@@ -206,7 +206,8 @@ class TestScratchDir(CustomClusterTestSuite):
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
     # Trigger errors when writing the first two directories.
-    shutil.rmtree(dirs[0]) # Remove the first directory.
+    # Remove the first directory.
+    self.clear_tmp_dir(self.get_name_for_dir('write_error_failover', 0))
     # Make all subdirectories in the second directory non-writable.
     for dirpath, dirnames, filenames in os.walk(dirs[1]):
       os.chmod(dirpath, stat.S_IREAD)
@@ -215,6 +216,9 @@ class TestScratchDir(CustomClusterTestSuite):
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
+    # Restore second directory mod for cleanup later.
+    for dirpath, dirnames, filenames in os.walk(dirs[1]):
+      os.chmod(dirpath, stat.S_IRWXU)
 
   @pytest.mark.execute_serially
   def test_scratch_dirs_default_priority(self, vector):
@@ -222,7 +226,7 @@ class TestScratchDir(CustomClusterTestSuite):
         of those directories are used as scratch disk. By default, all 
directories
         should have the same priority and so all should be used in a round 
robin
         manner."""
-    normal_dirs = self.generate_dirs(5)
+    normal_dirs = self.generate_dirs('scratch_dirs_default_priority', 5)
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 
-scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], 
cluster_size=1,
@@ -247,7 +251,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def test_scratch_dirs_prioritized_spill(self, vector):
     """ 5 empty directories are created in the /tmp directory and we verify 
that only
         the directories with highest priority are used as scratch disk."""
-    normal_dirs = self.generate_dirs(5)
+    normal_dirs = self.generate_dirs('scratch_dirs_prioritized_spill', 5)
     normal_dirs[0] = '{0}::{1}'.format(normal_dirs[0], 1)
     normal_dirs[1] = '{0}::{1}'.format(normal_dirs[1], 0)
     normal_dirs[2] = '{0}::{1}'.format(normal_dirs[2], 1)
@@ -318,7 +322,7 @@ class TestScratchDir(CustomClusterTestSuite):
   @SkipIf.not_scratch_fs
   def test_scratch_dirs_remote_spill(self, vector):
     # Test one remote directory with one its local buffer directory.
-    normal_dirs = self.generate_dirs(1)
+    normal_dirs = self.generate_dirs('scratch_dirs_remote_spill', 1)
     # Use dfs for testing.
     normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
@@ -348,7 +352,7 @@ class TestScratchDir(CustomClusterTestSuite):
     '''Two local directories, the first one is always used as local buffer for
        remote directories. Set the second directory big enough so that only 
spills
        to local in the test'''
-    normal_dirs = self.generate_dirs(2)
+    normal_dirs = self.generate_dirs('scratch_dirs_mix_spill_local', 2)
     normal_dirs[0] = '{0}::{1}'.format(normal_dirs[0], 1)
     normal_dirs[1] = '{0}:2GB:{1}'.format(normal_dirs[1], 0)
     normal_dirs.append(self.dfs_tmp_path())
@@ -382,7 +386,7 @@ class TestScratchDir(CustomClusterTestSuite):
     '''Two local directories, the first one is always used as local buffer for
        remote directories. Set the second directory small enough so that it 
spills
        to both directories in the test'''
-    normal_dirs = self.generate_dirs(2)
+    normal_dirs = self.generate_dirs('scratch_dirs_mix_spill_both', 2)
     normal_dirs[0] = '{0}:32MB:{1}'.format(normal_dirs[0], 0)
     normal_dirs[1] = '{0}:4MB:{1}'.format(normal_dirs[1], 1)
     normal_dirs.append(self.dfs_tmp_path())
@@ -415,7 +419,7 @@ class TestScratchDir(CustomClusterTestSuite):
   @SkipIf.not_scratch_fs
   def test_scratch_dirs_remote_spill_with_options(self, vector):
     # One local buffer directory and one remote directory.
-    normal_dirs = self.generate_dirs(1)
+    normal_dirs = self.generate_dirs('scratch_dirs_remote_spill_with_option', 
1)
     normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0)
     normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
@@ -449,7 +453,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def test_scratch_dirs_remote_spill_concurrent(self, vector):
     '''Concurrently execute multiple queries that trigger the spilling to the 
remote
     directory to test if there is a deadlock issue.'''
-    normal_dirs = self.generate_dirs(1)
+    normal_dirs = self.generate_dirs('scratch_dirs_remote_spill_concurrent', 1)
     normal_dirs[0] = '{0}:16MB:{1}'.format(normal_dirs[0], 0)
     normal_dirs.append(self.dfs_tmp_path())
     num = 5
@@ -495,7 +499,7 @@ class TestScratchDir(CustomClusterTestSuite):
   @SkipIf.not_scratch_fs
   def test_scratch_dirs_batch_reading(self, vector):
     # Set the buffer directory small enough to spill to the remote one.
-    normal_dirs = self.generate_dirs(1)
+    normal_dirs = self.generate_dirs('scratch_dirs_batch_reading', 1)
     normal_dirs[0] = '{0}:2MB:{1}'.format(normal_dirs[0], 1)
     normal_dirs.append(self.dfs_tmp_path())
     self._start_impala_cluster([
diff --git a/tests/custom_cluster/test_shell_commandline.py 
b/tests/custom_cluster/test_shell_commandline.py
index a41a431dc..4d8902930 100644
--- a/tests/custom_cluster/test_shell_commandline.py
+++ b/tests/custom_cluster/test_shell_commandline.py
@@ -19,7 +19,6 @@ from __future__ import absolute_import, division, 
print_function
 import os
 import pytest
 import re
-import tempfile
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.test_dimensions import create_client_protocol_http_transport
@@ -33,8 +32,8 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
   in a separate process.  Assertions are done by scanning the shell output and 
Impala
   server logs for expected strings."""
 
-  LOG_DIR_HTTP_TRACING = tempfile.mkdtemp(prefix="http_tracing")
-  LOG_DIR_HTTP_TRACING_OFF = tempfile.mkdtemp(prefix="http_tracing_off")
+  LOG_DIR_HTTP_TRACING = "http_tracing"
+  LOG_DIR_HTTP_TRACING_OFF = "http_tracing_off"
   IMPALA_ID_RE = "([0-9a-f]{16}:[0-9a-f]{16})"
 
   @classmethod
@@ -49,7 +48,10 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_http_transport())
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-log_dir={0} -v 
2".format(LOG_DIR_HTTP_TRACING))
+  @CustomClusterTestSuite.with_args(
+      impalad_args="-v 2",
+      impala_log_dir="{" + LOG_DIR_HTTP_TRACING + "}",
+      tmp_dir_placeholders=[LOG_DIR_HTTP_TRACING])
   def test_http_tracing_headers(self, vector):
     """Asserts that tracing headers are automatically added by the impala 
shell to
     all calls to the backend impala engine made using the hs2 over http 
protocol.
@@ -84,7 +86,8 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
                                
.format(TestImpalaShellCommandLine.IMPALA_ID_RE))
 
     # Find all HTTP Connection Tracing log lines.
-    with open(os.path.join(self.LOG_DIR_HTTP_TRACING, "impalad.INFO")) as 
log_file:
+    with open(os.path.join(
+      self.get_tmp_dir(self.LOG_DIR_HTTP_TRACING), "impalad.INFO")) as 
log_file:
       for line in log_file:
         if line.find("HTTP Connection Tracing Headers") > -1:
           tracing_lines_count += 1
@@ -149,7 +152,10 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
       pytest.fail("did not find Impala query id in shell stdout")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-log_dir={0} -v 
2".format(LOG_DIR_HTTP_TRACING_OFF))
+  @CustomClusterTestSuite.with_args(
+      impalad_args="-v 2",
+      impala_log_dir="{" + LOG_DIR_HTTP_TRACING_OFF + "}",
+      tmp_dir_placeholders=[LOG_DIR_HTTP_TRACING_OFF])
   def test_http_tracing_headers_off(self, vector):
     """Asserts the impala shell command line parameter to prevent the addition 
of http
     tracing headers actually leaves out those tracing headers."""
@@ -167,7 +173,8 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
     assert result.stdout.find("Query Runtime Profile") > -1
 
     # Find all HTTP Connection Tracing log lines (there should not be any).
-    with open(os.path.join(self.LOG_DIR_HTTP_TRACING_OFF, "impalad.INFO")) as 
log_file:
+    with open(os.path.join(
+      self.get_tmp_dir(self.LOG_DIR_HTTP_TRACING_OFF), "impalad.INFO")) as 
log_file:
       for line in log_file:
         if line.find("HTTP Connection Tracing Headers") != -1:
           pytest.fail("found HTTP connection tracing line line: 
{0}".format(line))
diff --git a/tests/custom_cluster/test_shell_jwt_auth.py 
b/tests/custom_cluster/test_shell_jwt_auth.py
index fe62cb1c9..b252edff3 100644
--- a/tests/custom_cluster/test_shell_jwt_auth.py
+++ b/tests/custom_cluster/test_shell_jwt_auth.py
@@ -18,7 +18,6 @@
 from __future__ import absolute_import, division, print_function
 import os
 import pytest
-import tempfile
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.test_dimensions import create_client_protocol_http_transport
@@ -39,16 +38,16 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   should not need to be executed again.
   """
 
-  LOG_DIR_JWT_AUTH_SUCCESS = tempfile.mkdtemp(prefix="jwt_auth_success")
-  LOG_DIR_JWT_AUTH_FAIL = tempfile.mkdtemp(prefix="jwt_auth_fail")
-  LOG_DIR_JWT_AUTH_INVALID_JWK = 
tempfile.mkdtemp(prefix="jwt_auth_invalid_jwk")
-
   JWKS_JWTS_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'jwt')
   JWKS_JSON_PATH = os.path.join(JWKS_JWTS_DIR, 'jwks_signing.json')
   JWT_SIGNED_PATH = os.path.join(JWKS_JWTS_DIR, 'jwt_signed')
   JWT_EXPIRED_PATH = os.path.join(JWKS_JWTS_DIR, 'jwt_expired')
   JWT_INVALID_JWK = os.path.join(JWKS_JWTS_DIR, 'jwt_signed_untrusted')
 
+  IMPALAD_ARGS = ("-v 2 -jwks_file_path={0} -jwt_custom_claim_username=sub "
+                  "-jwt_token_auth=true -jwt_allow_without_tls=true "
+                  .format(JWKS_JSON_PATH))
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -61,10 +60,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impala_log_dir=LOG_DIR_JWT_AUTH_SUCCESS,
-    impalad_args="-v 2 -jwks_file_path={0} -jwt_custom_claim_username=sub "
-    "-jwt_token_auth=true -jwt_allow_without_tls=true"
-    .format(JWKS_JSON_PATH))
+    impalad_args=IMPALAD_ARGS,
+    impala_log_dir="{jwt_auth_success}",
+    tmp_dir_placeholders=["jwt_auth_success"])
   def test_jwt_auth_valid(self, vector):
     """Asserts the Impala shell can authenticate to Impala using JWT 
authentication.
     Also executes a query to ensure the authentication was successful."""
@@ -97,10 +95,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impala_log_dir=LOG_DIR_JWT_AUTH_FAIL,
-    impalad_args="-v 2 -jwks_file_path={0} -jwt_custom_claim_username=sub "
-    "-jwt_token_auth=true -jwt_allow_without_tls=true"
-    .format(JWKS_JSON_PATH))
+    impalad_args=IMPALAD_ARGS,
+    impala_log_dir="{jwt_auth_fail}",
+    tmp_dir_placeholders=["jwt_auth_fail"])
   def test_jwt_auth_expired(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
@@ -137,10 +134,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impala_log_dir=LOG_DIR_JWT_AUTH_INVALID_JWK,
-    impalad_args="-v 2 -jwks_file_path={0} -jwt_custom_claim_username=sub "
-    "-jwt_token_auth=true -jwt_allow_without_tls=true"
-    .format(JWKS_JSON_PATH))
+    impalad_args=IMPALAD_ARGS,
+    impala_log_dir="{jwt_auth_invalid_jwk}",
+    tmp_dir_placeholders=["jwt_auth_invalid_jwk"])
   def test_jwt_auth_invalid_jwk(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
diff --git a/tests/custom_cluster/test_startup_filesystem_checks.py 
b/tests/custom_cluster/test_startup_filesystem_checks.py
index 7b2e81a76..52d277f6d 100644
--- a/tests/custom_cluster/test_startup_filesystem_checks.py
+++ b/tests/custom_cluster/test_startup_filesystem_checks.py
@@ -21,7 +21,6 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import pytest
 import os
-import tempfile
 
 from impala_py_lib.helpers import find_all_files, is_core_dump
 from tests.common.file_utils import assert_file_in_dir_contains
@@ -53,54 +52,62 @@ class TestStartupFilesystemChecks(CustomClusterTestSuite):
     "",
     get_fs_path("/test-warehouse/alltypes")]
   )
-  LOG_DIR = tempfile.mkdtemp(prefix="test_startup_filesystem_checks_",
-                             dir=os.getenv("LOG_DIR"))
-  MINIDUMP_PATH = tempfile.mkdtemp()
+  LOG_DIR = "startup_filesystem_checks"
+  MINIDUMP_PATH = "minidump_path"
+  LOG_DIR_PLACEHOLDER = "{" + LOG_DIR + "}"
+  MINIDUMP_PLACEHOLDER = "--minidump_path={" + MINIDUMP_PATH + "}"
 
-  IMPALAD_ARGS = "--startup_filesystem_check_directories={0} 
--minidump_path={1}"
+  IMPALAD_ARGS = "--startup_filesystem_check_directories={0} "
 
   pre_test_cores = None
 
+  def get_log_dir(self):
+    return self.get_tmp_dir(self.LOG_DIR)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impala_log_dir=LOG_DIR,
-      impalad_args=IMPALAD_ARGS.format(NONEXISTENT_PATH, MINIDUMP_PATH))
-  def test_nonexistent_path(self, unique_name):
+      impala_log_dir=LOG_DIR_PLACEHOLDER,
+      impalad_args=IMPALAD_ARGS.format(NONEXISTENT_PATH) + 
MINIDUMP_PLACEHOLDER,
+      tmp_dir_placeholders=[LOG_DIR, MINIDUMP_PATH])
+  def test_nonexistent_path(self):
     # parse log file for expected exception
-    assert_file_in_dir_contains(TestStartupFilesystemChecks.LOG_DIR,
-        "Invalid path specified for startup_filesystem_check_directories: " +
-        "{0} does not 
exist.".format(TestStartupFilesystemChecks.NONEXISTENT_PATH))
+    assert_file_in_dir_contains(self.get_log_dir(),
+        ("Invalid path specified for startup_filesystem_check_directories: "
+         "{0} does not exist.").format(self.NONEXISTENT_PATH))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impala_log_dir=LOG_DIR,
-      impalad_args=IMPALAD_ARGS.format(NONDIRECTORY_PATH, MINIDUMP_PATH))
-  def test_nondirectory_path(self, unique_name):
+      impala_log_dir=LOG_DIR_PLACEHOLDER,
+      impalad_args=IMPALAD_ARGS.format(NONDIRECTORY_PATH) + 
MINIDUMP_PLACEHOLDER,
+      tmp_dir_placeholders=[LOG_DIR, MINIDUMP_PATH])
+  def test_nondirectory_path(self):
     # parse log file for expected exception
-    assert_file_in_dir_contains(TestStartupFilesystemChecks.LOG_DIR,
-        "Invalid path specified for startup_filesystem_check_directories: " +
-        "{0} is not a 
directory.".format(TestStartupFilesystemChecks.NONDIRECTORY_PATH))
+    assert_file_in_dir_contains(self.get_log_dir(),
+        ("Invalid path specified for startup_filesystem_check_directories: "
+         "{0} is not a directory.").format(self.NONDIRECTORY_PATH))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impala_log_dir=LOG_DIR,
-      impalad_args=IMPALAD_ARGS.format(VALID_SUBDIRECTORY, MINIDUMP_PATH))
-  def test_valid_subdirectory(self, unique_name):
+      impala_log_dir=LOG_DIR_PLACEHOLDER,
+      impalad_args=IMPALAD_ARGS.format(VALID_SUBDIRECTORY) + 
MINIDUMP_PLACEHOLDER,
+      tmp_dir_placeholders=[LOG_DIR, MINIDUMP_PATH])
+  def test_valid_subdirectory(self):
     # parse log file for expected log message showing success
-    assert_file_in_dir_contains(TestStartupFilesystemChecks.LOG_DIR,
-        "Successfully listed 
{0}".format(TestStartupFilesystemChecks.VALID_SUBDIRECTORY))
+    assert_file_in_dir_contains(self.get_log_dir(),
+        "Successfully listed {0}".format(self.VALID_SUBDIRECTORY))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impala_log_dir=LOG_DIR,
-      impalad_args=IMPALAD_ARGS.format(MULTIPLE_VALID_DIRECTORIES, 
MINIDUMP_PATH))
-  def test_multiple_valid_dirs(self, unique_name):
-    valid_directories = 
TestStartupFilesystemChecks.MULTIPLE_VALID_DIRECTORIES.split(",")
+      impala_log_dir=LOG_DIR_PLACEHOLDER,
+      impalad_args=IMPALAD_ARGS.format(MULTIPLE_VALID_DIRECTORIES) + 
MINIDUMP_PLACEHOLDER,
+      tmp_dir_placeholders=[LOG_DIR, MINIDUMP_PATH])
+  def test_multiple_valid_dirs(self):
+    valid_directories = self.MULTIPLE_VALID_DIRECTORIES.split(",")
     for valid_dir in valid_directories:
       if len(valid_dir) == 0:
         continue
       # parse log file for expected log message showing success
-      assert_file_in_dir_contains(TestStartupFilesystemChecks.LOG_DIR,
+      assert_file_in_dir_contains(self.get_log_dir(),
         "Successfully listed {0}".format(valid_dir))
 
   def setup_method(self, method):

Reply via email to