This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 95f353ac4a8dd4015368349ef5c138c714891920
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Oct 30 15:02:08 2024 -0700

    IMPALA-13507: Allow disabling glog buffering via with_args fixture
    
    We have plenty of custom_cluster tests that assert against content of
    Impala daemon log files while the process is still running using
    assert_log_contains() and it's wrappers. The method specifically mention
    about disabling glog buffering ('-logbuflevel=-1'), but not all
    custom_cluster tests do that. This often result in flaky test that hard
    to triage and often neglected if it does not frequently run in core
    exploration.
    
    This patch adds boolean param 'disable_log_buffering' into
    CustomClusterTestSuite.with_args for test to declare intention to
    inspect log files in live minicluster. If it is True, start minicluster
    with '-logbuflevel=-1' for all daemons. If it is False, log WARNING on
    any calls to assert_log_contains().
    
    There are several complex custom_cluster tests that left unchanged and
    print out such WARNING logs, such as:
    - TestQueryLive
    - TestQueryLogTableBeeswax
    - TestQueryLogOtherTable
    - TestQueryLogTableHS2
    - TestQueryLogTableAll
    - TestQueryLogTableBufferPool
    - TestStatestoreRpcErrors
    - TestWorkloadManagementInitWait
    - TestWorkloadManagementSQLDetails
    
    This patch also fixed some small flake8 issues on modified tests.
    
    There is a flakiness sign at test_query_live.py where test query is
    submitted to coordinator and fail because sys.impala_query_live table
    has not exist yet from coordinator's perspective. This patch modify
    test_query_live.py to wait for few seconds until sys.impala_query_live
    is queryable.
    
    Testing:
    - Pass custom_cluster tests in exhaustive exploration.
    
    Change-Id: I56fb1746b8f3cea9f3db3514a86a526dffb44a61
    Reviewed-on: http://gerrit.cloudera.org:8080/22015
    Reviewed-by: Jason Fehr <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/authorization/test_authorization.py          |  3 +-
 tests/authorization/test_ranger.py                 | 11 ++++--
 tests/common/custom_cluster_test_suite.py          | 22 ++++++++---
 tests/common/impala_test_suite.py                  | 17 +++++++++
 tests/custom_cluster/test_admission_controller.py  |  3 +-
 tests/custom_cluster/test_catalog_hms_failures.py  |  6 +--
 tests/custom_cluster/test_catalogd_ha.py           |  3 +-
 tests/custom_cluster/test_codegen_cache.py         |  7 +++-
 tests/custom_cluster/test_custom_statestore.py     |  8 ++--
 tests/custom_cluster/test_data_cache.py            |  4 +-
 .../test_disable_catalog_data_ops.py               |  3 +-
 tests/custom_cluster/test_events_custom_configs.py |  3 +-
 tests/custom_cluster/test_local_catalog.py         | 10 +++--
 tests/custom_cluster/test_logging.py               |  9 +++--
 tests/custom_cluster/test_partition.py             | 12 ++++--
 tests/custom_cluster/test_pause_monitor.py         |  2 +-
 tests/custom_cluster/test_query_event_hooks.py     |  9 +++--
 tests/custom_cluster/test_query_expiration.py      |  4 +-
 tests/custom_cluster/test_query_live.py            |  8 +++-
 tests/custom_cluster/test_query_log.py             |  7 ++--
 tests/custom_cluster/test_query_retries.py         |  1 +
 tests/custom_cluster/test_re2_max_mem.py           |  6 ++-
 tests/custom_cluster/test_restart_services.py      | 11 ++++--
 .../test_runtime_filter_aggregation.py             |  3 +-
 tests/custom_cluster/test_services_rpc_errors.py   | 12 +++---
 tests/custom_cluster/test_shell_jwt_auth.py        |  9 +++--
 tests/custom_cluster/test_thrift_socket.py         |  6 ++-
 tests/custom_cluster/test_workload_mgmt_init.py    | 44 ++++++++++++++--------
 28 files changed, 163 insertions(+), 80 deletions(-)

diff --git a/tests/authorization/test_authorization.py 
b/tests/authorization/test_authorization.py
index 969562968..7dbb86b97 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -89,7 +89,8 @@ class TestAuthorization(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       "--server_name=server1 --authorization_policy_file=ignored_file",
-      impala_log_dir="{deprecated_flags}", 
tmp_dir_placeholders=['deprecated_flags'])
+      impala_log_dir="{deprecated_flags}", 
tmp_dir_placeholders=['deprecated_flags'],
+      disable_log_buffering=True)
   def test_deprecated_flags(self):
     assert_file_in_dir_contains(self.impala_log_dir, "Ignoring removed flag "
                                                      
"authorization_policy_file")
diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index 17de9c31c..c2f06cb44 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -34,7 +34,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 from tests.common.file_utils import copy_files_to_hdfs_dir
 from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIf
 from tests.common.test_dimensions import (create_client_protocol_dimension,
-    create_exec_option_dimension, create_orc_dimension)
+    create_orc_dimension)
 from tests.common.test_vector import ImpalaTestVector
 from tests.shell.util import run_impala_shell_cmd
 from tests.util.hdfs_util import NAMENODE
@@ -77,7 +77,8 @@ class TestRanger(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", 
dir=os.getenv("LOG_DIR")),
     impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"),
-    catalogd_args=CATALOGD_ARGS)
+    catalogd_args=CATALOGD_ARGS,
+    disable_log_buffering=True)
   def test_xff_ranger_audit(self):
     """
     Tests XFF client IP is included in ranger audit logs when using hs2-http 
protocol
@@ -2125,7 +2126,8 @@ class TestRanger(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="{0} {1}".format(IMPALAD_ARGS,
                                   
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=CATALOGD_ARGS)
+    catalogd_args=CATALOGD_ARGS,
+    disable_log_buffering=True)
   def test_allow_metadata_update(self, unique_name):
     self.__test_allow_catalog_cache_op_from_masked_users(unique_name)
 
@@ -2133,7 +2135,8 @@ class TestRanger(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS,
                                   
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
+    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
+    disable_log_buffering=True)
   def test_allow_metadata_update_local_catalog(self, unique_name):
     self.__test_allow_catalog_cache_op_from_masked_users(unique_name)
 
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 0b18ffae4..c3ebb19c2 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -74,6 +74,8 @@ TMP_DIR_PLACEHOLDERS = 'tmp_dir_placeholders'
 # startup fails with "num_known_live_backends did not reach expected value in 
time", then
 # the test passes. Any other exception is raised.
 EXPECT_STARTUP_FAIL = 'expect_startup_fail'
+# If True, add '--logbuflevel=-1' into all impala daemon args.
+DISABLE_LOG_BUFFERING = 'disable_log_buffering'
 
 # Args that accept additional formatting to supply temporary dir path.
 ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR])
@@ -138,7 +140,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
       impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
       impalad_graceful_shutdown=False, tmp_dir_placeholders=[],
-      expect_startup_fail=False):
+      expect_startup_fail=False, disable_log_buffering=False):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
     method's func_dict"""
     def decorate(func):
@@ -170,14 +172,16 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.__dict__[IMPALAD_TIMEOUT_S] = impalad_timeout_s
       if expect_cores is not None:
         func.__dict__[EXPECT_CORES] = expect_cores
-      if reset_ranger is not False:
+      if reset_ranger:
         func.__dict__[RESET_RANGER] = True
-      if impalad_graceful_shutdown is not False:
+      if impalad_graceful_shutdown:
         func.__dict__[IMPALAD_GRACEFUL_SHUTDOWN] = True
       if tmp_dir_placeholders:
         func.__dict__[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders
       if expect_startup_fail:
         func.__dict__[EXPECT_STARTUP_FAIL] = True
+      if disable_log_buffering:
+        func.__dict__[DISABLE_LOG_BUFFERING] = True
       return func
     return decorate
 
@@ -208,6 +212,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   def setup_method(self, method):
     cluster_args = list()
+    disable_log_buffering = method.__dict__.get(DISABLE_LOG_BUFFERING, False)
+    self._warn_assert_log = not disable_log_buffering
 
     if TMP_DIR_PLACEHOLDERS in method.__dict__:
       # Create all requested temporary dirs.
@@ -221,10 +227,15 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       # log, which doesn't start until after the grace period has passed.
       cluster_args.append(
           "--impalad=--shutdown_grace_period_s=0 --shutdown_deadline_s=15")
-    for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS, JVM_ARGS]:
+    impala_daemons = [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS]
+    for arg in (impala_daemons + [JVM_ARGS]):
+      val = ''
+      if arg in impala_daemons and disable_log_buffering:
+        val += '--logbuflevel=-1 '
       if arg in method.__dict__:
-        val = (method.__dict__[arg] if arg not in ACCEPT_FORMATTING
+        val += (method.__dict__[arg] if arg not in ACCEPT_FORMATTING
                else method.__dict__[arg].format(**self.TMP_DIRS))
+      if val:
         cluster_args.append("--%s=%s " % (arg, val))
     if START_ARGS in method.__dict__:
       cluster_args.extend(method.__dict__[START_ARGS].split())
@@ -301,7 +312,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         else:
           raise e
 
-
   def teardown_method(self, method):
     if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
       for impalad in self.cluster.impalads:
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 47affdb00..05c62d278 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -178,6 +178,11 @@ EXEC_OPTION_NAMES = set([val.lower()
 
 # Base class for Impala tests. All impala test cases should inherit from this 
class
 class ImpalaTestSuite(BaseTestSuite):
+
+  # If True, call to assert_log_contains() will print WARN log for possibility 
of
+  # not disabling glog buffering (--logbuflevel=-1).
+  _warn_assert_log = False
+
   @classmethod
   def add_test_dimensions(cls):
     """
@@ -525,6 +530,11 @@ class ImpalaTestSuite(BaseTestSuite):
 
   def __do_replacements(self, s, use_db=None, extra=None):
     globs = globals()
+    # following assignment are purposefully redundant to avoid flake8 warnings 
(F401).
+    globs['FILESYSTEM_PREFIX'] = FILESYSTEM_PREFIX
+    globs['FILESYSTEM_URI_SCHEME'] = FILESYSTEM_URI_SCHEME
+    globs['S3_BUCKET_NAME'] = S3_BUCKET_NAME
+    globs['S3GUARD_ENABLED'] = S3GUARD_ENABLED
     repl = dict(('$' + k, globs[k]) for k in [
         "FILESYSTEM_PREFIX",
         "FILESYSTEM_NAME",
@@ -1356,6 +1366,13 @@ class ImpalaTestSuite(BaseTestSuite):
     Returns the result of the very last call to line_regex.search or None if
     expected_count is 0 or the line_regex did not match any lines.
     """
+    if (self._warn_assert_log):
+      LOG.warning(
+          "{} calls assert_log_contains() with timeout_s={}. Make sure that 
glog "
+          "buffering has been disabled (--logbuflevel=-1), or "
+          "CustomClusterTestSuite.with_args is set with 
disable_log_buffering=True, "
+          "or timeout_s is sufficient.".format(self.__class__.__name__, 
timeout_s))
+
     pattern = re.compile(line_regex)
     start_time = time.time()
     while True:
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index dd891c998..a73ac3c47 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -826,7 +826,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="--logbuflevel=-1 " + 
impalad_admission_ctrl_flags(max_requests=1,
         max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
-    statestored_args=_STATESTORED_ARGS)
+    statestored_args=_STATESTORED_ARGS,
+    disable_log_buffering=True)
   def test_cancellation(self):
     """ Test to confirm that all Async cancellation windows are hit and are 
able to
     succesfully cancel the query"""
diff --git a/tests/custom_cluster/test_catalog_hms_failures.py 
b/tests/custom_cluster/test_catalog_hms_failures.py
index 32de2a4f7..6feea8d08 100644
--- a/tests/custom_cluster/test_catalog_hms_failures.py
+++ b/tests/custom_cluster/test_catalog_hms_failures.py
@@ -27,8 +27,6 @@ from tests.common.custom_cluster_test_suite import (
     DEFAULT_CLUSTER_SIZE)
 from tests.common.skip import SkipIf
 from tests.util.event_processor_utils import EventProcessorUtils
-from tests.util.filesystem_utils import IS_ISILON, IS_LOCAL
-
 
 NUM_SUBSCRIBERS = DEFAULT_CLUSTER_SIZE + 1
 
@@ -117,7 +115,8 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
-    catalogd_args='--catalog_topic_mode=minimal')
+    catalogd_args='--catalog_topic_mode=minimal',
+    disable_log_buffering=True)
   def test_hms_client_retries(self):
     """Test that a running query will trigger the retry logic in
     RetryingMetaStoreClient."""
@@ -152,6 +151,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
     self.run_hive_metastore()
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+
 @SkipIf.is_test_jdk
 class TestCatalogHMSFailures(CustomClusterTestSuite):
   """Test Catalog behavior when HMS is not present."""
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 675585674..8a0463f14 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -368,7 +368,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_restart_statestore(self):
     """The test case for restarting statestore after the cluster is created 
with
     catalogd HA enabled."""
diff --git a/tests/custom_cluster/test_codegen_cache.py 
b/tests/custom_cluster/test_codegen_cache.py
index 2c55fa2a6..e9da51cb2 100644
--- a/tests/custom_cluster/test_codegen_cache.py
+++ b/tests/custom_cluster/test_codegen_cache.py
@@ -24,6 +24,7 @@ from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
 from tests.common.test_result_verifier import assert_codegen_cache_hit
 from tests.util.filesystem_utils import get_fs_path
 
+
 @SkipIf.not_hdfs
 @SkipIfNotHdfsMinicluster.scheduling
 class TestCodegenCache(CustomClusterTestSuite):
@@ -146,14 +147,16 @@ class TestCodegenCache(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
-          impalad_args=SYMBOL_EMITTER_TESTS_IMPALAD_ARGS + 
"--asm_module_dir=/dev/null")
+          impalad_args=SYMBOL_EMITTER_TESTS_IMPALAD_ARGS + 
"--asm_module_dir=/dev/null",
+          disable_log_buffering=True)
   # Regression test for IMPALA-12260.
   def test_codegen_cache_with_asm_module_dir(self, vector):
     self._test_codegen_cache_with_symbol_emitter(vector)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
-          impalad_args=SYMBOL_EMITTER_TESTS_IMPALAD_ARGS + "--perf_map")
+          impalad_args=SYMBOL_EMITTER_TESTS_IMPALAD_ARGS + "--perf_map",
+          disable_log_buffering=True)
   # Regression test for IMPALA-12260.
   def test_codegen_cache_with_perf_map(self, vector):
     self._test_codegen_cache_with_symbol_emitter(vector)
diff --git a/tests/custom_cluster/test_custom_statestore.py 
b/tests/custom_cluster/test_custom_statestore.py
index ef0dbd817..c9ef7c39c 100644
--- a/tests/custom_cluster/test_custom_statestore.py
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -21,17 +21,13 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import logging
-import os
 import pytest
-import re
-import sys
 import uuid
 import socket
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
 from tests.common.skip import SkipIfBuildType
-from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.patterns import print_id
 from time import sleep
 
@@ -48,6 +44,7 @@ LOG = logging.getLogger('custom_statestore_test')
 STATESTORE_SERVICE_PORT = 24000
 CATALOG_SERVICE_PORT = 26000
 
+
 # A simple wrapper class to launch a cluster where we can tune various
 # startup parameters of the statestored to test correct boundary-value
 # behavior.
@@ -128,7 +125,8 @@ class TestCustomStatestore(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--statestore_subscriber_use_resolved_address=true",
-      catalogd_args="--statestore_subscriber_use_resolved_address=true")
+      catalogd_args="--statestore_subscriber_use_resolved_address=true",
+      disable_log_buffering=True)
   def test_subscriber_with_resolved_address(self, vector):
     # Ensure cluster has started up by running a query.
     result = self.execute_query("select count(*) from 
functional_parquet.alltypes")
diff --git a/tests/custom_cluster/test_data_cache.py 
b/tests/custom_cluster/test_data_cache.py
index e57577acf..734d4c049 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -309,13 +309,13 @@ class TestDataCache(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
-      start_args=CACHE_START_ARGS, cluster_size=1)
+      start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
   def test_data_cache_readonly_lru(self, vector):
     self.__test_data_cache_readonly(vector)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
-      start_args=CACHE_START_ARGS, cluster_size=1)
+      start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
   def test_data_cache_readonly_lirs(self, vector):
     self.__test_data_cache_readonly(vector)
diff --git a/tests/custom_cluster/test_disable_catalog_data_ops.py 
b/tests/custom_cluster/test_disable_catalog_data_ops.py
index 181afbe5a..ee94189ee 100644
--- a/tests/custom_cluster/test_disable_catalog_data_ops.py
+++ b/tests/custom_cluster/test_disable_catalog_data_ops.py
@@ -28,7 +28,8 @@ class TestDisableCatalogDataOps(CustomClusterTestSuite):
   # and adding specifically one java-udf and one avro table.
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      catalogd_args="--logbuflevel=-1 
--disable_catalog_data_ops_debug_only=true")
+      catalogd_args="--logbuflevel=-1 
--disable_catalog_data_ops_debug_only=true",
+      disable_log_buffering=True)
   def test_disable_catalog_data_ops(self):
     # Expects that all Java UDF loading messages are for skip and that none of 
them load.
     self.assert_catalogd_log_contains(
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index b3695e297..6a1062c13 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1311,7 +1311,7 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
     catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=5",
-    cluster_size=1)
+    disable_log_buffering=True, cluster_size=1)
   def test_invalidate_stale_partition_on_reload(self, unique_database):
     test_tbl = unique_database + ".test_invalidate_table"
     self.client.execute("create table {} (id int) partitioned by (p int)"
@@ -1329,6 +1329,7 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     self.assert_impalad_log_contains('INFO', log_regex % 1)
     self.assert_impalad_log_contains('INFO', log_regex % 2)
 
+
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
   """This class contains tests that exercise the event processing mechanism in 
the
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index 140920d04..d83a17f53 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -118,7 +118,8 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true",
-      catalogd_args="--catalog_topic_mode=minimal")
+      catalogd_args="--catalog_topic_mode=minimal",
+      disable_log_buffering=True)
   def test_restart_catalogd(self, unique_database):
     """
     Tests for the behavior of LocalCatalog when catalogd restarts.
@@ -161,8 +162,9 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
-    catalogd_args="--catalog_topic_mode=minimal "
-                  "--enable_incremental_metadata_updates=true")
+    catalogd_args=("--catalog_topic_mode=minimal "
+                   "--enable_incremental_metadata_updates=true"),
+    disable_log_buffering=True)
   def test_invalidate_stale_partitions(self, unique_database):
     """
     Test that partition level invalidations are sent from catalogd and 
processed
@@ -578,7 +580,7 @@ class TestLocalCatalogObservability(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
     catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=0",
-    cluster_size=1)
+    cluster_size=1, disable_log_buffering=True)
   def test_invalidate_stale_partition_on_reload(self, unique_database):
     test_tbl = unique_database + ".test_invalidate_table"
     self.client.execute(
diff --git a/tests/custom_cluster/test_logging.py 
b/tests/custom_cluster/test_logging.py
index 13f6595c2..861348489 100644
--- a/tests/custom_cluster/test_logging.py
+++ b/tests/custom_cluster/test_logging.py
@@ -45,18 +45,21 @@ class TestLoggingCore(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--max_error_logs_per_instance=2")
+      impalad_args="--max_error_logs_per_instance=2",
+      disable_log_buffering=True)
   def test_max_errors(self):
     self._test_max_errors(2, 4, True)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--max_error_logs_per_instance=3")
+      impalad_args="--max_error_logs_per_instance=3",
+      disable_log_buffering=True)
   def test_max_errors_0(self):
     self._test_max_errors(3, 0, True)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--max_error_logs_per_instance=2")
+      impalad_args="--max_error_logs_per_instance=2",
+      disable_log_buffering=True)
   def test_max_errors_no_downgrade(self):
     self._test_max_errors(2, -1, False)
diff --git a/tests/custom_cluster/test_partition.py 
b/tests/custom_cluster/test_partition.py
index a65296c34..e0ef8ecf6 100644
--- a/tests/custom_cluster/test_partition.py
+++ b/tests/custom_cluster/test_partition.py
@@ -101,28 +101,32 @@ class TestPartitionDeletion(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=2000",
     impalad_args="--use_local_catalog=false",
-    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=0")
+    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=0",
+    disable_log_buffering=True)
   def test_legacy_catalog_no_event_processing(self, unique_database):
     self._test_partition_deletion(unique_database)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=2000",
     impalad_args="--use_local_catalog=false",
-    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=1")
+    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=1",
+    disable_log_buffering=True)
   def test_legacy_catalog_with_event_processing(self, unique_database):
     self._test_partition_deletion(unique_database)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=2000",
     impalad_args="--use_local_catalog=true",
-    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=0")
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=0",
+    disable_log_buffering=True)
   def test_local_catalog_no_event_processing(self, unique_database):
     self._test_partition_deletion(unique_database)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=2000",
     impalad_args="--use_local_catalog=true",
-    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=1")
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=1",
+    disable_log_buffering=True)
   def test_local_catalog_with_event_processing(self, unique_database):
     self._test_partition_deletion(unique_database)
 
diff --git a/tests/custom_cluster/test_pause_monitor.py 
b/tests/custom_cluster/test_pause_monitor.py
index 165b11dc5..97cd173bb 100644
--- a/tests/custom_cluster/test_pause_monitor.py
+++ b/tests/custom_cluster/test_pause_monitor.py
@@ -25,7 +25,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 class TestPauseMonitor(CustomClusterTestSuite):
   """Class for pause monitor tests."""
 
-  @CustomClusterTestSuite.with_args("--logbuflevel=-1")
+  @CustomClusterTestSuite.with_args(disable_log_buffering=True)
   def test_jvm_pause_monitor_logs_entries(self):
     """This test injects a non-GC pause and confirms that that the JVM pause
     monitor detects and logs it."""
diff --git a/tests/custom_cluster/test_query_event_hooks.py 
b/tests/custom_cluster/test_query_event_hooks.py
index 3be2ba353..9a09cf489 100644
--- a/tests/custom_cluster/test_query_event_hooks.py
+++ b/tests/custom_cluster/test_query_event_hooks.py
@@ -35,7 +35,8 @@ class TestHooks(CustomClusterTestSuite):
       impalad_args=("--query_event_hook_classes={0} -logbuflevel=-1 
".format(DUMMY_HOOK)
                     + "--minidump_path={query_event_hooks_minidump}"),
       catalogd_args="--minidump_path={query_event_hooks_minidump}",
-      tmp_dir_placeholders=['query_event_hooks_log', 
'query_event_hooks_minidump'])
+      tmp_dir_placeholders=['query_event_hooks_log', 
'query_event_hooks_minidump'],
+      disable_log_buffering=True)
   def test_query_event_hooks_execute(self):
     """
     Tests that the post query execution hook actually executes by using a
@@ -81,7 +82,8 @@ class TestHooksStartupFail(CustomClusterTestSuite):
       impalad_args=("--query_event_hook_classes={0} ".format(FAILING_HOOK)
                     + "--minidump_path={hook_startup_fail_minidump}"),
       catalogd_args="--minidump_path={hook_startup_fail_minidump}",
-      tmp_dir_placeholders=['hook_startup_fail_log', 
'hook_startup_fail_minidump'])
+      tmp_dir_placeholders=['hook_startup_fail_log', 
'hook_startup_fail_minidump'],
+      disable_log_buffering=True)
   def test_hook_startup_fail(self):
     """
     Tests that exception during QueryEventHook.onImpalaStart will prevent
@@ -103,7 +105,8 @@ class TestHooksStartupFail(CustomClusterTestSuite):
                     + "--minidump_path={hook_instantiation_fail_minidump}"),
       catalogd_args="--minidump_path={hook_instantiation_fail_minidump}",
       tmp_dir_placeholders=['hook_instantiation_fail_log',
-                            'hook_instantiation_fail_minidump'])
+                            'hook_instantiation_fail_minidump'],
+      disable_log_buffering=True)
   def test_hook_instantiation_fail(self):
     """
     Tests that failure to instantiate a QueryEventHook will prevent
diff --git a/tests/custom_cluster/test_query_expiration.py 
b/tests/custom_cluster/test_query_expiration.py
index 6bd62461a..ae0eaf02d 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -46,7 +46,9 @@ class TestQueryExpiration(CustomClusterTestSuite):
         % (waiting, len(in_flight_queries), expect_waiting)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--idle_query_timeout=8",
+      disable_log_buffering=True)
   def test_query_expiration(self, vector):
     """Confirm that single queries expire if not fetched"""
     impalad = self.cluster.get_first_impalad()
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index b272c02d4..17fe3e3e0 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -34,6 +34,8 @@ class TestQueryLive(CustomClusterTestSuite):
   def setup_method(self, method):
     super(TestQueryLive, self).setup_method(method)
     self.wait_for_wm_init_complete()
+    # Wait few seconds until sys.impala_query_live is queryable.
+    self.wait_for_table_to_appear('sys', 'impala_query_live', 30)
 
   def assert_describe_extended(self):
     describe_ext_result = self.execute_query('describe extended 
sys.impala_query_live')
@@ -200,7 +202,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_alter(self):
     """Asserts alter works on query live table."""
     column_desc = 'test_alter\tstring\t'
@@ -331,7 +334,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=3,
-                                    num_exclusive_coordinators=2)
+                                    num_exclusive_coordinators=2,
+                                    disable_log_buffering=True)
   def test_missing_coordinator(self):
     """Asserts scans finish if a coordinator disappears mid-schedule. Depends 
on
     test config of statestore_heartbeat_frequency_ms=50."""
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 88b078ac3..1b56c928b 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -30,7 +30,6 @@ from thrift.transport.TTransport import TBufferedTransport
 from thrift.protocol import TBinaryProtocol
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
-from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.retry import retry
 from tests.util.workload_management import assert_query
@@ -679,7 +678,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                                  "--shutdown_deadline_s=15 "
                                                  "--debug_actions="
                                                  
"WM_SHUTDOWN_DELAY:SLEEP@5000",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_flush_on_shutdown(self, vector):
     """Asserts that queries that have completed but are not yet written to the 
query
        log table are flushed to the table before the coordinator exits. 
Graceful shutdown
@@ -730,7 +730,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                                  "--shutdown_deadline_s=15 "
                                                  "--debug_actions="
                                                  
"WM_SHUTDOWN_DELAY:SLEEP@10000",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_shutdown_flush_timed_out(self, vector):
     """Asserts that queries that have completed but are not yet written to the 
query
        log table are lost if the completed queries queue drain takes too long 
and that
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 786ad23e7..300532cfa 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -596,6 +596,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.client.close_query(handle)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(disable_log_buffering=True)
   def test_query_retry_reaches_spool_limit(self):
     """Test retryable queries with results spooling enabled and
     spool_all_results_for_retries=true that reach spooling mem limit will 
return rows and
diff --git a/tests/custom_cluster/test_re2_max_mem.py 
b/tests/custom_cluster/test_re2_max_mem.py
index ded88bd2d..aa88cceda 100755
--- a/tests/custom_cluster/test_re2_max_mem.py
+++ b/tests/custom_cluster/test_re2_max_mem.py
@@ -58,13 +58,15 @@ class TestRE2MaxMem(CustomClusterTestSuite):
         self._test_re2_max_mem(True, True)
 
     @pytest.mark.execute_serially
-    @CustomClusterTestSuite.with_args(cluster_size=1)
+    @CustomClusterTestSuite.with_args(cluster_size=1,
+        disable_log_buffering=True)
     def test_re2_max_mem_not_specified(self):
         # default max_mem set by re2's regex engine is 8 MiB
         self._test_re2_max_mem(False, True)
 
     @pytest.mark.execute_serially
     @CustomClusterTestSuite.with_args(cluster_size=1,
-        impalad_args="--re2_mem_limit=200MB")
+        impalad_args="--re2_mem_limit=200MB",
+        disable_log_buffering=True)
     def test_dfa_not_out_of_mem(self):
         self._test_re2_max_mem(False, False)
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 01b9971fe..82c77098d 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -203,7 +203,8 @@ class TestRestart(CustomClusterTestSuite):
     statestored_args="--statestore_update_frequency_ms=2000",
     impalad_args=("--wait_for_new_catalog_service_id_timeout_sec={} \
                   --wait_for_new_catalog_service_id_max_iterations=-1"
-                  .format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)))
+                  .format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)),
+    disable_log_buffering=True)
   def test_restart_catalogd_while_handling_rpc_response_with_timeout(self,
       unique_database):
     """Regression test for IMPALA-12267. We'd like to cause a situation where
@@ -268,7 +269,8 @@ class TestRestart(CustomClusterTestSuite):
         STATESTORE_UPDATE_FREQ_SEC * 1000),
     impalad_args=("--wait_for_new_catalog_service_id_timeout_sec=-1 \
                   --wait_for_new_catalog_service_id_max_iterations={}"
-                  .format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)))
+                  .format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)),
+    disable_log_buffering=True)
   def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self,
       unique_database):
     """We create the same situation as described in
@@ -897,7 +899,8 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
           --shutdown_deadline_s={deadline} \
           
--hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S,
             deadline=EXEC_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname()),
-      cluster_size=1)
+      cluster_size=1,
+      disable_log_buffering=True)
   def test_shutdown_signal(self):
     """Test that an idle impalad shuts down in a timely manner after the 
shutdown grace
     period elapses."""
@@ -920,7 +923,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
         self.EXEC_SHUTDOWN_DEADLINE_S))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(cluster_size=1)
+  @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True)
   def test_sending_multiple_shutdown_signals(self):
     """Test that multiple IMPALA_SHUTDOWN_SIGNAL signals are all handeled 
without
     crashing the process."""
diff --git a/tests/custom_cluster/test_runtime_filter_aggregation.py 
b/tests/custom_cluster/test_runtime_filter_aggregation.py
index d1397cd9b..f8bee4785 100644
--- a/tests/custom_cluster/test_runtime_filter_aggregation.py
+++ b/tests/custom_cluster/test_runtime_filter_aggregation.py
@@ -94,7 +94,8 @@ class TestLateQueryStateInit(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args="--sort_runtime_filter_aggregator_candidates=true 
--logbuflevel=-1")
+    impalad_args="--sort_runtime_filter_aggregator_candidates=true",
+    disable_log_buffering=True)
   def test_late_query_state_init(self, vector):
     """Test that distributed runtime filter aggregation still works
     when remote query state of intermediate aggregator node is late to 
initialize."""
diff --git a/tests/custom_cluster/test_services_rpc_errors.py 
b/tests/custom_cluster/test_services_rpc_errors.py
index be9a8f029..a19268a2d 100644
--- a/tests/custom_cluster/test_services_rpc_errors.py
+++ b/tests/custom_cluster/test_services_rpc_errors.py
@@ -36,7 +36,7 @@ class TestStatestoreRpcErrors(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       " --debug_actions=REGISTER_SUBSCRIBER_FIRST_ATTEMPT:[email protected]")
-  def test_register_subscriber_rpc_error(self, vector):
+  def test_register_subscriber_rpc_error(self):
     self.assert_impalad_log_contains("INFO",
         "Injected RPC error.*Debug Action: REGISTER_SUBSCRIBER_FIRST_ATTEMPT")
 
@@ -46,8 +46,9 @@ class TestStatestoreRpcErrors(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      " --debug_actions=GET_PROTOCOL_VERSION_FIRST_ATTEMPT:[email protected]")
-  def test_get_protocol_version_rpc_error(self, vector):
+      impalad_args=" 
--debug_actions=GET_PROTOCOL_VERSION_FIRST_ATTEMPT:[email protected]",
+      disable_log_buffering=True)
+  def test_get_protocol_version_rpc_error(self):
     self.assert_impalad_log_contains("INFO",
         "Injected RPC error.*Debug Action: GET_PROTOCOL_VERSION_FIRST_ATTEMPT")
 
@@ -71,8 +72,9 @@ class TestCatalogRpcErrors(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      " --debug_actions=CATALOG_RPC_FIRST_ATTEMPT:[email protected]")
-  def test_register_subscriber_rpc_error(self, vector, unique_database):
+      impalad_args=" --debug_actions=CATALOG_RPC_FIRST_ATTEMPT:[email protected]",
+      disable_log_buffering=True)
+  def test_register_subscriber_rpc_error(self, unique_database):
     """Validate that RPCs to the catalogd are retried by injecting a failure 
into the
     first RPC attempt for any catalogd RPC. Run a variety of queries that 
require
     catalogd interaction to ensure all RPCs are retried."""
diff --git a/tests/custom_cluster/test_shell_jwt_auth.py 
b/tests/custom_cluster/test_shell_jwt_auth.py
index 44f4ac7b8..14ba06cb6 100644
--- a/tests/custom_cluster/test_shell_jwt_auth.py
+++ b/tests/custom_cluster/test_shell_jwt_auth.py
@@ -62,7 +62,8 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS,
     impala_log_dir="{jwt_auth_success}",
-    tmp_dir_placeholders=["jwt_auth_success"])
+    tmp_dir_placeholders=["jwt_auth_success"],
+    disable_log_buffering=True)
   def test_jwt_auth_valid(self, vector):
     """Asserts the Impala shell can authenticate to Impala using JWT 
authentication.
     Also executes a query to ensure the authentication was successful."""
@@ -97,7 +98,8 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS,
     impala_log_dir="{jwt_auth_fail}",
-    tmp_dir_placeholders=["jwt_auth_fail"])
+    tmp_dir_placeholders=["jwt_auth_fail"],
+    disable_log_buffering=True)
   def test_jwt_auth_expired(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
@@ -136,7 +138,8 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS,
     impala_log_dir="{jwt_auth_invalid_jwk}",
-    tmp_dir_placeholders=["jwt_auth_invalid_jwk"])
+    tmp_dir_placeholders=["jwt_auth_invalid_jwk"],
+    disable_log_buffering=True)
   def test_jwt_auth_invalid_jwk(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
diff --git a/tests/custom_cluster/test_thrift_socket.py 
b/tests/custom_cluster/test_thrift_socket.py
index 4461a6820..857f78925 100644
--- a/tests/custom_cluster/test_thrift_socket.py
+++ b/tests/custom_cluster/test_thrift_socket.py
@@ -77,7 +77,8 @@ class TestThriftSocket(CustomClusterTestSuite):
     return 'functional-query'
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args=IDLE_ARGS, cluster_size=1)
+  @CustomClusterTestSuite.with_args(
+      impalad_args=IDLE_ARGS, cluster_size=1, disable_log_buffering=True)
   def test_peek_timeout_no_ssl(self):
     # Iterate over test vector within test function to avoid restarting 
cluster.
     for protocol_dim in create_client_protocol_dimension():
@@ -93,7 +94,8 @@ class TestThriftSocket(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(statestored_args=SSL_ARGS,
                                     catalogd_args=SSL_ARGS,
                                     impalad_args=(SSL_ARGS + IDLE_ARGS),
-                                    cluster_size=1)
+                                    cluster_size=1,
+                                    disable_log_buffering=True)
   def test_peek_timeout_ssl(self):
     # Iterate over test vector within test function to avoid restarting 
cluster.
     for protocol_dim in create_client_protocol_dimension():
diff --git a/tests/custom_cluster/test_workload_mgmt_init.py 
b/tests/custom_cluster/test_workload_mgmt_init.py
index 157f7de40..807f640a6 100644
--- a/tests/custom_cluster/test_workload_mgmt_init.py
+++ b/tests/custom_cluster/test_workload_mgmt_init.py
@@ -164,7 +164,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
   @CustomClusterTestSuite.with_args(
       impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0")
+      catalogd_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
+      disable_log_buffering=True)
   def test_no_upgrade(self):
     """Tests that no upgrade happens when starting a cluster where the 
workload management
        tables are already at version 1.1.0."""
@@ -208,7 +209,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
       impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
       catalogd_args="--enable_workload_mgmt "
                     "--workload_mgmt_schema_version=1.0.0 "
-                    
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live")
+                    
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live",
+      disable_log_buffering=True)
   def test_upgrade_1_0_0_to_1_1_0(self, vector):
     """Asserts that an upgrade from version 1.0.0 to 1.1.0 succeeds on a 10 
node cluster
        when starting with no existing workload management tables."""
@@ -233,7 +235,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     self.run_test_case('QueryTest/workload-management-live-v1.1.0', vector, 
self.WM_DB)
 
   @CustomClusterTestSuite.with_args(cluster_size=1, 
impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt")
+      catalogd_args="--enable_workload_mgmt", disable_log_buffering=True)
   def test_log_table_newer_schema_version(self):
     """Asserts a catalog startup flag version that is older than the workload
        management table schema version will write only the fields associated 
with the
@@ -333,7 +335,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
       catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      tmp_dir_placeholders=['invalid_schema'])
+      tmp_dir_placeholders=['invalid_schema'],
+      disable_log_buffering=True)
   def test_invalid_schema_version_log_table_prop(self):
     """Tests that startup succeeds when the 'schema_version' table property on 
the
        sys.impala_query_log table contains an invalid value but the 
wm_schema_version
@@ -343,7 +346,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
       catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      tmp_dir_placeholders=['invalid_schema'])
+      tmp_dir_placeholders=['invalid_schema'],
+      disable_log_buffering=True)
   def test_invalid_wm_schema_version_log_table_prop(self):
     """Tests that startup fails when the 'wm_schema_version' table property on 
the
        sys.impala_query_log table contains an invalid value."""
@@ -352,7 +356,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
       catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      tmp_dir_placeholders=['invalid_schema'])
+      tmp_dir_placeholders=['invalid_schema'],
+      disable_log_buffering=True)
   def test_invalid_schema_version_live_table_prop(self):
     """Tests that startup succeeds when the 'schema_version' table property on 
the
        sys.impala_query_live table contains an invalid value but the 
wm_schema_version
@@ -362,7 +367,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
       catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      tmp_dir_placeholders=['invalid_schema'])
+      tmp_dir_placeholders=['invalid_schema'],
+      disable_log_buffering=True)
   def test_invalid_wm_schema_version_live_table_prop(self):
     """Tests that startup fails when the 'wm_schema_version' table property on 
the
        sys.impala_query_live table contains an invalid value."""
@@ -454,7 +460,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
       catalogd_args="--enable_workload_mgmt", 
start_args="--enable_catalogd_ha",
-      statestored_args="--use_subscriber_id_as_catalogd_priority=true")
+      statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+      disable_log_buffering=True)
   def test_catalog_ha(self):
     """Asserts workload management initialization is only done on the active 
catalogd."""
 
@@ -468,7 +475,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
       catalogd_args="--enable_workload_mgmt", 
start_args="--enable_catalogd_ha",
-      statestored_args="--use_subscriber_id_as_catalogd_priority=true")
+      statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+      disable_log_buffering=True)
   def test_catalog_ha_failover(self):
     """Asserts workload management initialization is not run a second time 
when catalogd
        failover happens."""
@@ -496,7 +504,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
       catalogd_args="--enable_workload_mgmt",
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
-      start_args="--enable_statestored_ha")
+      start_args="--enable_statestored_ha",
+      disable_log_buffering=True)
   def test_statestore_ha(self):
     """Asserts workload management initialization completes successfully when 
statestore
        ha is enabled."""
@@ -508,7 +517,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
       catalogd_args="--enable_workload_mgmt",
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
-      start_args="--enable_catalogd_ha --enable_statestored_ha")
+      start_args="--enable_catalogd_ha --enable_statestored_ha",
+      disable_log_buffering=True)
   def test_catalog_statestore_ha(self):
     """Asserts workload management initialization is only done on the active 
catalogd
        when both catalog and statestore ha is enabled."""
@@ -535,7 +545,8 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
       impalad_args="--enable_workload_mgmt --query_log_write_interval_s=3",
       catalogd_args="--enable_workload_mgmt "
                     
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live "
-                    "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000")
+                    "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000",
+      disable_log_buffering=True)
   def test_catalog_init_delay(self):
     # Workload management init is slightly delayed after catalogd startup, 
wait for the
     # debug action to begin before continuing since that log message 
guarantees the
@@ -563,7 +574,8 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
       impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo "
                    "--minidump_path={minidumps}",
       catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo 
"
-                    "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'])
+                    "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'],
+      disable_log_buffering=True)
   def test_start_invalid_version(self):
     """Asserts that starting a cluster with an invalid workload management 
version
        errors. Cluster sizes of 1 are used to speed up the initial setup."""
@@ -580,7 +592,8 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
       impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=0.0.1 "
                    "--minidump_path={minidumps}",
       catalogd_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=0.0.1 "
-                    "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'])
+                    "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'],
+      disable_log_buffering=True)
   def test_start_unknown_version(self):
     """Asserts that starting a cluster with an unknown workload management 
version errors.
        Cluster sizes of 1 are used to speed up the initial setup."""
@@ -593,7 +606,8 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
         r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0'$")
 
   @CustomClusterTestSuite.with_args(start_args="--enable_catalogd_ha",
-      statestored_args="--use_subscriber_id_as_catalogd_priority=true")
+      statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+      disable_log_buffering=True)
   def test_catalog_ha_no_workload_mgmt(self):
     """Asserts workload management initialization is not done on either 
catalogd when
        workload management is not enabled."""

Reply via email to