This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2ac8a40 IMPALA-13822: Add more detail in impala_connection.py logs
6f2ac8a40 is described below
commit 6f2ac8a4067558a1eee58eb506275d22049fdba3
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Mar 3 14:14:57 2025 -0800
IMPALA-13822: Add more detail in impala_connection.py logs
This patch make impala_connection.py to use the same log format as
declared in conftest.py. Connection specific logs will have the protocol
name printed. Modified set_configuration() and
set_configuration_option() to make option related logging more concise.
Moved LOG_FORMAT from conftest.py to patterns.py for reuse in
impala_connection.py.
Testing:
- Run TestExprLimits locally and confirm that the log lines printed at
logs/ee_tests/results/TEST-impala-parallel.xml is OK.
Change-Id: I44ea7fbec15684ac5379703f781a400b4f17da8d
Reviewed-on: http://gerrit.cloudera.org:8080/22577
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Csaba Ringhofer <[email protected]>
---
tests/common/impala_connection.py | 93 ++++++++++++++++++++++++---------------
tests/common/impala_test_suite.py | 5 ++-
tests/common/patterns.py | 2 +
tests/conftest.py | 3 +-
4 files changed, 64 insertions(+), 39 deletions(-)
diff --git a/tests/common/impala_connection.py
b/tests/common/impala_connection.py
index 789809dc1..b4285ec7d 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -35,6 +35,7 @@ from tests.beeswax.impala_beeswax import (
DEFAULT_SLEEP_INTERVAL,
ImpalaBeeswaxClient,
ImpalaBeeswaxException)
+from tests.common.patterns import LOG_FORMAT
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
from tests.util.thrift_util import (
op_handle_to_query_id,
@@ -45,7 +46,7 @@ LOG = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# All logging needs to be either executable SQL or a SQL comment (prefix with
--).
-console_handler.setFormatter(logging.Formatter('%(message)s'))
+console_handler.setFormatter(logging.Formatter(LOG_FORMAT))
LOG.addHandler(console_handler)
LOG.propagate = False
@@ -89,21 +90,23 @@ def has_legal_future_state(impala_state, future_states):
# test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs).
It is the
# only test that runs SQL larger than 128KB. Logging these SQLs in execute()
increases
# the size of the JUnitXML files, causing problems for users of JUnitXML like
Jenkins.
-# This function limits the size of the SQL logged if it is larger than 128KB.
-def log_sql_stmt(sql_stmt):
- """If the 'sql_stmt' is shorter than MAX_SQL_LOGGING_LENGTH, log it
unchanged. If
- it is larger than MAX_SQL_LOGGING_LENGTH, truncate it and comment it
out."""
+# This function limits the size of the returned string if it is larger than
128KB.
+def format_sql_for_logging(sql_stmt):
+ """If the 'sql_stmt' is shorter than MAX_SQL_LOGGING_LENGTH, only wrap
sql_stmt with
+ new lines and semicolon. If it is larger than MAX_SQL_LOGGING_LENGTH,
truncate it
+ and comment it out. This function returns a unicode string."""
# sql_stmt could contain Unicode characters, so explicitly use unicode
literals
# so that Python 2 works.
if (len(sql_stmt) <= MAX_SQL_LOGGING_LENGTH):
- LOG.info(u"{0};\n".format(sql_stmt))
+ return u"\n{0};\n".format(sql_stmt)
else:
# The logging output should be valid SQL, so the truncated SQL is
commented out.
- LOG.info("-- Skip logging full SQL statement of length
{0}".format(len(sql_stmt)))
- LOG.info("-- Logging a truncated version, commented out:")
- for line in sql_stmt[0:MAX_SQL_LOGGING_LENGTH].split("\n"):
- LOG.info(u"-- {0}".format(line))
- LOG.info("-- [...]")
+ truncated_sql = u'\n--'.join(
+ [line for line in sql_stmt[0:MAX_SQL_LOGGING_LENGTH].split("\n")])
+ return (u"\n-- Skip logging full SQL statement of length {0}"
+ u"\n-- Logging a truncated version, commented out:"
+ u"\n-- {1}"
+ u"\n-- [...]\n").format(len(sql_stmt), truncated_sql)
def collect_default_query_options(options, name, val, kind):
@@ -154,16 +157,28 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta,
object)):
pass
@abc.abstractmethod
- def set_configuration_option(self, name, value):
- """Sets a configuration option name to the given value"""
+ def set_configuration_option(self, name, value, is_log_sql=True):
+ """Sets a configuration option name to the given value.
+ Return True if option is changing. Otherwise, return False (option already
has the
+ same value). If is_log_sql True, log the equivalent SET query to INFO. Do
note though
+ that the option change does not actually happen by issuing SET query."""
pass
def set_configuration(self, config_option_dict):
- """Replaces existing configuration with the given dictionary"""
- assert config_option_dict is not None, "config_option_dict cannot be None"
+ """Replaces existing configuration with the given dictionary.
+ If config_option_dict is an empty dictionary, simply clear current client
+ configuration."""
+ assert isinstance(config_option_dict, dict), \
+ "config_option_dict must be a dictionary"
self.clear_configuration()
+ if not config_option_dict:
+ return
+ log_lines = list()
for name, value in config_option_dict.items():
- self.set_configuration_option(name, value)
+ if self.set_configuration_option(name, value, False):
+ log_lines.append("set {0}={1};".format(name, value))
+ if log_lines:
+
self.log_client("set_configuration:\n\n{}\n".format('\n'.join(log_lines)))
@abc.abstractmethod
def clear_configuration(self):
@@ -294,11 +309,11 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta,
object)):
def log_handle(self, operation_handle, message):
"""Log 'message' at INFO level, along with id of 'operation_handle'."""
handle_id = self.handle_id(operation_handle)
- LOG.info("-- {0}: {1}".format(handle_id, message))
+ LOG.info(u"{0}: {1}".format(handle_id, message))
def log_client(self, message):
- """Log 'message' at INFO level."""
- LOG.info("-- {0}".format(message))
+ """Log 'message' at INFO level, prefixed wih the protocol name of this
connection."""
+ LOG.info(u"{0}: {1}".format(self.get_test_protocol(), message))
def wait_for_impala_state(self, operation_handle, expected_impala_state,
timeout):
"""Waits for the given 'operation_handle' to reach the
'expected_impala_state'.
@@ -386,13 +401,16 @@ class BeeswaxConnection(ImpalaConnection):
def get_host_port(self):
return self.__host_port
- def set_configuration_option(self, name, value):
+ def set_configuration_option(self, name, value, is_log_sql=True):
# Only set the option if it's not already set to the same value.
name = name.lower()
value = str(value)
if self.__beeswax_client.get_query_option(name) != value:
- LOG.info("set_option('{}', '{}')".format(name, value))
self.__beeswax_client.set_query_option(name, value)
+ if is_log_sql:
+ self.log_client("\n\nset {0}={1};\n".format(name, value))
+ return True
+ return False
def __collect_default_options(self):
options = {}
@@ -436,14 +454,14 @@ class BeeswaxConnection(ImpalaConnection):
self.__beeswax_client.close_dml(operation_handle.get_handle())
def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
- self.log_client("executing against %s\n" % (self.__host_port))
- log_sql_stmt(sql_stmt)
+ self.log_client(u"executing against {0}\n{1}".format(
+ self.__host_port, format_sql_for_logging(sql_stmt)))
return self.__beeswax_client.execute(sql_stmt, user=user,
fetch_profile_after_close=fetch_profile_after_close)
def execute_async(self, sql_stmt, user=None):
- self.log_client("executing async: %s\n" % (self.__host_port))
- log_sql_stmt(sql_stmt)
+ self.log_client(u"executing async {0}\n{1}".format(
+ self.__host_port, format_sql_for_logging(sql_stmt)))
beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt,
user=user)
return OperationHandle(beeswax_handle, sql_stmt)
@@ -499,7 +517,7 @@ class BeeswaxConnection(ImpalaConnection):
def log_handle(self, operation_handle, message):
handle_id = self.handle_id(operation_handle)
- LOG.info("-- {0}: {1}".format(handle_id, message))
+ LOG.info(u"{0}: {1}".format(handle_id, message))
def get_query_id(self, operation_handle):
return operation_handle.get_handle().id
@@ -566,14 +584,17 @@ class ImpylaHS2Connection(ImpalaConnection):
def get_host_port(self):
return self.__host_port
- def set_configuration_option(self, name, value):
+ def set_configuration_option(self, name, value, is_log_sql=True):
# Only set the option if it's not already set to the same value.
# value must be parsed to string.
name = name.lower()
value = str(value)
if self.__query_options.get(name) != value:
- LOG.info("set_option('{}', '{}')".format(name, value))
self.__query_options[name] = value
+ if is_log_sql:
+ self.log_client("\n\nset {0}={1};\n".format(name, value))
+ return True
+ return False
def get_default_configuration(self):
return self.__default_query_options.copy()
@@ -658,23 +679,24 @@ class ImpylaHS2Connection(ImpalaConnection):
return op_handle.get_profile(TRuntimeProfileFormat.STRING)
return None
- def __log_execute(self, cursor, user):
+ def __log_execute(self, cursor, user, sql_stmt):
self.log_client(
- "executing against {0} at {1}. session: {2} main_cursor: {3} user:
{4}\n".format(
- (self._is_hive and 'Hive' or 'Impala'), self.__host_port,
- self.__get_session_id(cursor), (cursor == self.__cursor), user)
+ (u"executing against {0} at {1}. session: {2} main_cursor: {3} "
+ u"user: {4}\n{5}").format(
+ (self._is_hive and 'Hive' or 'Impala'), self.__host_port,
+ self.__get_session_id(cursor), (cursor == self.__cursor), user,
+ format_sql_for_logging(sql_stmt))
)
def execute(self, sql_stmt, user=None,
profile_format=TRuntimeProfileFormat.STRING,
fetch_profile_after_close=False):
- log_sql_stmt(sql_stmt)
cursor = self.__cursor
result = None
try:
if user != self.__user:
# Must create a new cursor to supply 'user'.
cursor = self.__open_single_cursor(user=user)
- self.__log_execute(cursor, user)
+ self.__log_execute(cursor, user, sql_stmt)
cursor.execute(sql_stmt, configuration=self.__query_options)
handle = OperationHandle(cursor, sql_stmt)
self.log_handle(handle, "started query in session {0}".format(
@@ -716,8 +738,7 @@ class ImpylaHS2Connection(ImpalaConnection):
try:
async_cursor = self.__open_single_cursor(user=user)
handle = OperationHandle(async_cursor, sql_stmt)
- self.__log_execute(async_cursor, user)
- log_sql_stmt(sql_stmt)
+ self.__log_execute(async_cursor, user, sql_stmt)
async_cursor.execute_async(sql_stmt, configuration=self.__query_options)
self.__async_cursors.append(async_cursor)
return handle
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index 2fe09a898..1f34176a3 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1186,10 +1186,13 @@ class ImpalaTestSuite(BaseTestSuite):
result = self.execute_query_using_client(client, query, vector)
# Restore client configuration before returning.
modified_configs = vector.get_exec_option_dict().keys()
+ options_to_restore = dict()
for name, val in client.get_default_configuration().items():
lower_name = name.lower()
if lower_name in modified_configs:
- client.set_configuration_option(lower_name, val)
+ options_to_restore[lower_name] = val
+ if options_to_restore:
+ client.set_configuration(options_to_restore)
return result
@execute_wrapper
diff --git a/tests/common/patterns.py b/tests/common/patterns.py
index 850feb541..ee83ff485 100644
--- a/tests/common/patterns.py
+++ b/tests/common/patterns.py
@@ -26,6 +26,8 @@ VALID_IMPALA_IDENTIFIER_REGEX =
re.compile(r'^[a-zA-Z][a-zA-Z0-9_]{,127}$')
INT64_MASK = (1 << 64) - 1
+LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s"
+
def is_valid_impala_identifier(identifier):
"""Return True if identifier is a valid Impala identifier, False
otherwise."""
diff --git a/tests/conftest.py b/tests/conftest.py
index 626d68b2d..edf6a4a2c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -35,13 +35,12 @@ from impala_py_lib.helpers import find_all_files,
is_core_dump
import tests.common.base_test_suite
from tests.common.environ import build_flavor_timeout
from tests.common.test_result_verifier import QueryTestResult
-from tests.common.patterns import is_valid_impala_identifier
+from tests.common.patterns import LOG_FORMAT, is_valid_impala_identifier
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
from tests.comparison.db_connection import ImpalaConnection
from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT,
WAREHOUSE
LOG = logging.getLogger('test_configuration')
-LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s"
VALID_TEST_PROTOCOLS = [BEESWAX, HS2, HS2_HTTP]
DEFAULT_CONN_TIMEOUT = 45