This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f2ac8a40 IMPALA-13822: Add more detail in impala_connection.py logs
6f2ac8a40 is described below

commit 6f2ac8a4067558a1eee58eb506275d22049fdba3
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Mar 3 14:14:57 2025 -0800

    IMPALA-13822: Add more detail in impala_connection.py logs
    
    This patch make impala_connection.py to use the same log format as
    declared in conftest.py. Connection specific logs will have the protocol
    name printed. Modified set_configuration() and
    set_configuration_option() to make option related logging more concise.
    
    Moved LOG_FORMAT from conftest.py to patterns.py for reuse in
    impala_connection.py.
    
    Testing:
    - Run TestExprLimits locally and confirm that the log lines printed at
      logs/ee_tests/results/TEST-impala-parallel.xml is OK.
    
    Change-Id: I44ea7fbec15684ac5379703f781a400b4f17da8d
    Reviewed-on: http://gerrit.cloudera.org:8080/22577
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
---
 tests/common/impala_connection.py | 93 ++++++++++++++++++++++++---------------
 tests/common/impala_test_suite.py |  5 ++-
 tests/common/patterns.py          |  2 +
 tests/conftest.py                 |  3 +-
 4 files changed, 64 insertions(+), 39 deletions(-)

diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 789809dc1..b4285ec7d 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -35,6 +35,7 @@ from tests.beeswax.impala_beeswax import (
   DEFAULT_SLEEP_INTERVAL,
   ImpalaBeeswaxClient,
   ImpalaBeeswaxException)
+from tests.common.patterns import LOG_FORMAT
 from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
 from tests.util.thrift_util import (
   op_handle_to_query_id,
@@ -45,7 +46,7 @@ LOG = logging.getLogger(__name__)
 console_handler = logging.StreamHandler()
 console_handler.setLevel(logging.INFO)
 # All logging needs to be either executable SQL or a SQL comment (prefix with 
--).
-console_handler.setFormatter(logging.Formatter('%(message)s'))
+console_handler.setFormatter(logging.Formatter(LOG_FORMAT))
 LOG.addHandler(console_handler)
 LOG.propagate = False
 
@@ -89,21 +90,23 @@ def has_legal_future_state(impala_state, future_states):
 # test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs). 
It is the
 # only test that runs SQL larger than 128KB. Logging these SQLs in execute() 
increases
 # the size of the JUnitXML files, causing problems for users of JUnitXML like 
Jenkins.
-# This function limits the size of the SQL logged if it is larger than 128KB.
-def log_sql_stmt(sql_stmt):
-  """If the 'sql_stmt' is shorter than MAX_SQL_LOGGING_LENGTH, log it 
unchanged. If
-     it is larger than MAX_SQL_LOGGING_LENGTH, truncate it and comment it 
out."""
+# This function limits the size of the returned string if it is larger than 
128KB.
+def format_sql_for_logging(sql_stmt):
+  """If the 'sql_stmt' is shorter than MAX_SQL_LOGGING_LENGTH, only wrap 
sql_stmt with
+  new lines and semicolon. If it is larger than MAX_SQL_LOGGING_LENGTH, 
truncate it
+  and comment it out. This function returns a unicode string."""
   # sql_stmt could contain Unicode characters, so explicitly use unicode 
literals
   # so that Python 2 works.
   if (len(sql_stmt) <= MAX_SQL_LOGGING_LENGTH):
-    LOG.info(u"{0};\n".format(sql_stmt))
+    return u"\n{0};\n".format(sql_stmt)
   else:
     # The logging output should be valid SQL, so the truncated SQL is 
commented out.
-    LOG.info("-- Skip logging full SQL statement of length 
{0}".format(len(sql_stmt)))
-    LOG.info("-- Logging a truncated version, commented out:")
-    for line in sql_stmt[0:MAX_SQL_LOGGING_LENGTH].split("\n"):
-      LOG.info(u"-- {0}".format(line))
-    LOG.info("-- [...]")
+    truncated_sql = u'\n--'.join(
+      [line for line in sql_stmt[0:MAX_SQL_LOGGING_LENGTH].split("\n")])
+    return (u"\n-- Skip logging full SQL statement of length {0}"
+            u"\n-- Logging a truncated version, commented out:"
+            u"\n-- {1}"
+            u"\n-- [...]\n").format(len(sql_stmt), truncated_sql)
 
 
 def collect_default_query_options(options, name, val, kind):
@@ -154,16 +157,28 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     pass
 
   @abc.abstractmethod
-  def set_configuration_option(self, name, value):
-    """Sets a configuration option name to the given value"""
+  def set_configuration_option(self, name, value, is_log_sql=True):
+    """Sets a configuration option name to the given value.
+    Return True if option is changing. Otherwise, return False (option already 
has the
+    same value). If is_log_sql True, log the equivalent SET query to INFO. Do 
note though
+    that the option change does not actually happen by issuing SET query."""
     pass
 
   def set_configuration(self, config_option_dict):
-    """Replaces existing configuration with the given dictionary"""
-    assert config_option_dict is not None, "config_option_dict cannot be None"
+    """Replaces existing configuration with the given dictionary.
+    If config_option_dict is an empty dictionary, simply clear current client
+    configuration."""
+    assert isinstance(config_option_dict, dict), \
+        "config_option_dict must be a dictionary"
     self.clear_configuration()
+    if not config_option_dict:
+      return
+    log_lines = list()
     for name, value in config_option_dict.items():
-      self.set_configuration_option(name, value)
+      if self.set_configuration_option(name, value, False):
+        log_lines.append("set {0}={1};".format(name, value))
+    if log_lines:
+      
self.log_client("set_configuration:\n\n{}\n".format('\n'.join(log_lines)))
 
   @abc.abstractmethod
   def clear_configuration(self):
@@ -294,11 +309,11 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
   def log_handle(self, operation_handle, message):
     """Log 'message' at INFO level, along with id of 'operation_handle'."""
     handle_id = self.handle_id(operation_handle)
-    LOG.info("-- {0}: {1}".format(handle_id, message))
+    LOG.info(u"{0}: {1}".format(handle_id, message))
 
   def log_client(self, message):
-    """Log 'message' at INFO level."""
-    LOG.info("-- {0}".format(message))
+    """Log 'message' at INFO level, prefixed wih the protocol name of this 
connection."""
+    LOG.info(u"{0}: {1}".format(self.get_test_protocol(), message))
 
   def wait_for_impala_state(self, operation_handle, expected_impala_state, 
timeout):
     """Waits for the given 'operation_handle' to reach the 
'expected_impala_state'.
@@ -386,13 +401,16 @@ class BeeswaxConnection(ImpalaConnection):
   def get_host_port(self):
     return self.__host_port
 
-  def set_configuration_option(self, name, value):
+  def set_configuration_option(self, name, value, is_log_sql=True):
     # Only set the option if it's not already set to the same value.
     name = name.lower()
     value = str(value)
     if self.__beeswax_client.get_query_option(name) != value:
-      LOG.info("set_option('{}', '{}')".format(name, value))
       self.__beeswax_client.set_query_option(name, value)
+      if is_log_sql:
+        self.log_client("\n\nset {0}={1};\n".format(name, value))
+      return True
+    return False
 
   def __collect_default_options(self):
     options = {}
@@ -436,14 +454,14 @@ class BeeswaxConnection(ImpalaConnection):
     self.__beeswax_client.close_dml(operation_handle.get_handle())
 
   def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
-    self.log_client("executing against %s\n" % (self.__host_port))
-    log_sql_stmt(sql_stmt)
+    self.log_client(u"executing against {0}\n{1}".format(
+      self.__host_port, format_sql_for_logging(sql_stmt)))
     return self.__beeswax_client.execute(sql_stmt, user=user,
         fetch_profile_after_close=fetch_profile_after_close)
 
   def execute_async(self, sql_stmt, user=None):
-    self.log_client("executing async: %s\n" % (self.__host_port))
-    log_sql_stmt(sql_stmt)
+    self.log_client(u"executing async {0}\n{1}".format(
+      self.__host_port, format_sql_for_logging(sql_stmt)))
     beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt, 
user=user)
     return OperationHandle(beeswax_handle, sql_stmt)
 
@@ -499,7 +517,7 @@ class BeeswaxConnection(ImpalaConnection):
 
   def log_handle(self, operation_handle, message):
     handle_id = self.handle_id(operation_handle)
-    LOG.info("-- {0}: {1}".format(handle_id, message))
+    LOG.info(u"{0}: {1}".format(handle_id, message))
 
   def get_query_id(self, operation_handle):
     return operation_handle.get_handle().id
@@ -566,14 +584,17 @@ class ImpylaHS2Connection(ImpalaConnection):
   def get_host_port(self):
     return self.__host_port
 
-  def set_configuration_option(self, name, value):
+  def set_configuration_option(self, name, value, is_log_sql=True):
     # Only set the option if it's not already set to the same value.
     # value must be parsed to string.
     name = name.lower()
     value = str(value)
     if self.__query_options.get(name) != value:
-      LOG.info("set_option('{}', '{}')".format(name, value))
       self.__query_options[name] = value
+      if is_log_sql:
+        self.log_client("\n\nset {0}={1};\n".format(name, value))
+      return True
+    return False
 
   def get_default_configuration(self):
     return self.__default_query_options.copy()
@@ -658,23 +679,24 @@ class ImpylaHS2Connection(ImpalaConnection):
       return op_handle.get_profile(TRuntimeProfileFormat.STRING)
     return None
 
-  def __log_execute(self, cursor, user):
+  def __log_execute(self, cursor, user, sql_stmt):
     self.log_client(
-      "executing against {0} at {1}. session: {2} main_cursor: {3} user: 
{4}\n".format(
-        (self._is_hive and 'Hive' or 'Impala'), self.__host_port,
-        self.__get_session_id(cursor), (cursor == self.__cursor), user)
+      (u"executing against {0} at {1}. session: {2} main_cursor: {3} "
+       u"user: {4}\n{5}").format(
+         (self._is_hive and 'Hive' or 'Impala'), self.__host_port,
+         self.__get_session_id(cursor), (cursor == self.__cursor), user,
+         format_sql_for_logging(sql_stmt))
     )
 
   def execute(self, sql_stmt, user=None, 
profile_format=TRuntimeProfileFormat.STRING,
       fetch_profile_after_close=False):
-    log_sql_stmt(sql_stmt)
     cursor = self.__cursor
     result = None
     try:
       if user != self.__user:
         # Must create a new cursor to supply 'user'.
         cursor = self.__open_single_cursor(user=user)
-      self.__log_execute(cursor, user)
+      self.__log_execute(cursor, user, sql_stmt)
       cursor.execute(sql_stmt, configuration=self.__query_options)
       handle = OperationHandle(cursor, sql_stmt)
       self.log_handle(handle, "started query in session {0}".format(
@@ -716,8 +738,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     try:
       async_cursor = self.__open_single_cursor(user=user)
       handle = OperationHandle(async_cursor, sql_stmt)
-      self.__log_execute(async_cursor, user)
-      log_sql_stmt(sql_stmt)
+      self.__log_execute(async_cursor, user, sql_stmt)
       async_cursor.execute_async(sql_stmt, configuration=self.__query_options)
       self.__async_cursors.append(async_cursor)
       return handle
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 2fe09a898..1f34176a3 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1186,10 +1186,13 @@ class ImpalaTestSuite(BaseTestSuite):
     result = self.execute_query_using_client(client, query, vector)
     # Restore client configuration before returning.
     modified_configs = vector.get_exec_option_dict().keys()
+    options_to_restore = dict()
     for name, val in client.get_default_configuration().items():
       lower_name = name.lower()
       if lower_name in modified_configs:
-        client.set_configuration_option(lower_name, val)
+        options_to_restore[lower_name] = val
+    if options_to_restore:
+      client.set_configuration(options_to_restore)
     return result
 
   @execute_wrapper
diff --git a/tests/common/patterns.py b/tests/common/patterns.py
index 850feb541..ee83ff485 100644
--- a/tests/common/patterns.py
+++ b/tests/common/patterns.py
@@ -26,6 +26,8 @@ VALID_IMPALA_IDENTIFIER_REGEX = 
re.compile(r'^[a-zA-Z][a-zA-Z0-9_]{,127}$')
 
 INT64_MASK = (1 << 64) - 1
 
+LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s"
+
 
 def is_valid_impala_identifier(identifier):
   """Return True if identifier is a valid Impala identifier, False 
otherwise."""
diff --git a/tests/conftest.py b/tests/conftest.py
index 626d68b2d..edf6a4a2c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -35,13 +35,12 @@ from impala_py_lib.helpers import find_all_files, 
is_core_dump
 import tests.common.base_test_suite
 from tests.common.environ import build_flavor_timeout
 from tests.common.test_result_verifier import QueryTestResult
-from tests.common.patterns import is_valid_impala_identifier
+from tests.common.patterns import LOG_FORMAT, is_valid_impala_identifier
 from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
 from tests.comparison.db_connection import ImpalaConnection
 from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT, 
WAREHOUSE
 
 LOG = logging.getLogger('test_configuration')
-LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s"
 VALID_TEST_PROTOCOLS = [BEESWAX, HS2, HS2_HTTP]
 
 DEFAULT_CONN_TIMEOUT = 45

Reply via email to