This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f35747ea IMPALA-5792: Eliminate duplicate beeswax python code
1f35747ea is described below

commit 1f35747ea3677bd004f8ed7b1cc94d6ef2f70c8e
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Nov 12 13:12:55 2024 -0800

    IMPALA-5792: Eliminate duplicate beeswax python code
    
    This patch unify duplicated exec summary code used by python beeswax
    clients: one used by the shell in impala_shell.py and one used by tests
    in impala_beeswax.py. The code that has progress furthest is the one in
    shell/impala_client.py, which is the one that can print correct exec
    summary table for MT_DOP>0 queries. It is made into a dedicated
    build_exec_summary_table function in impala_client.py, and then
    impala_beeswax.py import it from impala_client.py.
    
    This patch also fix several flake8 issues around the modified files.
    
    Testing:
    - Manually run TPC-DS Q74 in impala-shell and then type "summary"
      command. Confirm that plan tree is displayed properly.
    - Run single_node_perf_run.py over branches that produce different
      TPC-DS Q74 plan tree. Confirm that the plan tree are displayed
      correctly in performance_result.txt
    
    Change-Id: Ica57c90dd571d9ac74d76d9830da26c7fe20c74f
    Reviewed-on: http://gerrit.cloudera.org:8080/22060
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Daniel Becker <[email protected]>
---
 shell/impala_client.py          | 295 ++++++++++++++++++++++------------------
 shell/impala_shell.py           |  67 +++++----
 tests/beeswax/impala_beeswax.py | 145 ++++----------------
 3 files changed, 219 insertions(+), 288 deletions(-)

diff --git a/shell/impala_client.py b/shell/impala_client.py
index 284d1923d..4c79a9dbf 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -98,9 +98,10 @@ def utf8_encode_if_needed(val):
     val = val.encode('utf-8', errors='replace')
   return val
 
+
 # Regular expression that matches the progress line added to HS2 logs by
 # the Impala server.
-HS2_LOG_PROGRESS_REGEX = re.compile("Query.*Complete \([0-9]* out of 
[0-9]*\)\n")
+HS2_LOG_PROGRESS_REGEX = re.compile(r"Query.*Complete \([0-9]* out of 
[0-9]*\)\n")
 
 # Exception types to differentiate between the different RPCExceptions.
 # RPCException raised when TApplicationException is caught.
@@ -108,6 +109,158 @@ RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
 # RPCException raised when impala server sends a TStatusCode.ERROR_STATUS 
status code.
 RPC_EXCEPTION_SERVER = "SERVER_ERROR"
 
+
+def build_exec_summary_table(summary, idx, indent_level, new_indent_level, 
output,
+                             is_prettyprint=True, 
separate_prefix_column=False):
+  """Direct translation of Coordinator::PrintExecSummary() to recursively 
build a list
+  of rows of summary statistics, one per exec node
+
+  summary: the TExecSummary object that contains all the summary data
+
+  idx: the index of the node to print
+
+  indent_level: the number of spaces to print before writing the node's label, 
to give
+  the appearance of a tree. The 0th child of a node has the same indent_level 
as its
+  parent. All other children have an indent_level of one greater than their 
parent.
+
+  new_indent_level: If true, this indent level is different from the previous 
row's.
+
+  output: the list of rows into which to append the rows produced for this 
node and its
+  children.
+
+  is_prettyprint: Optional. If True, print time, units, and bytes columns in 
pretty
+  printed format.
+
+  separate_prefix_column: Optional. If True, the prefix and operator name will 
be
+  returned as separate column. Otherwise, prefix and operater name will be 
concatenated
+  into single column.
+
+  Returns the index of the next exec node in summary.exec_nodes that should be
+  processed, used internally to this method only.
+  """
+  attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
+
+  # Initialise aggregate and maximum stats
+  agg_stats, max_stats = TExecStats(), TExecStats()
+  for attr in attrs:
+    setattr(agg_stats, attr, 0)
+    setattr(max_stats, attr, 0)
+
+  node = summary.nodes[idx]
+  if node.exec_stats is not None:
+    for stats in node.exec_stats:
+      for attr in attrs:
+        val = getattr(stats, attr)
+        if val is not None:
+          setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
+          setattr(max_stats, attr, max(getattr(max_stats, attr), val))
+
+  if node.exec_stats is not None and node.exec_stats:
+    avg_time = agg_stats.latency_ns / len(node.exec_stats)
+  else:
+    avg_time = 0
+
+  is_sink = node.node_id == -1
+  # If the node is a broadcast-receiving exchange node, the cardinality of 
rows produced
+  # is the max over all instances (which should all have received the same 
number of
+  # rows). Otherwise, the cardinality is the sum over all instances which 
process
+  # disjoint partitions.
+  if is_sink:
+    cardinality = -1
+  elif node.is_broadcast:
+    cardinality = max_stats.cardinality
+  else:
+    cardinality = agg_stats.cardinality
+
+  est_stats = node.estimated_stats
+  label_prefix = ""
+  if indent_level > 0:
+    label_prefix = "|"
+    label_prefix += "  |" * (indent_level - 1)
+    if new_indent_level:
+      label_prefix += "--"
+    else:
+      label_prefix += "  "
+
+  def prettyprint(val, units, divisor):
+    for unit in units:
+      if val < divisor:
+        if unit == units[0]:
+          return "%d%s" % (val, unit)
+        else:
+          return "%3.2f%s" % (val, unit)
+      val /= divisor
+
+  def prettyprint_bytes(byte_val):
+    return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
+
+  def prettyprint_units(unit_val):
+    return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
+
+  def prettyprint_time(time_val):
+    return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
+
+  instances = 0
+  if node.exec_stats is not None:
+    instances = len(node.exec_stats)
+  latency = max_stats.latency_ns
+  cardinality_est = est_stats.cardinality
+  memory_used = max_stats.memory_used
+  memory_est = est_stats.memory_used
+  if (is_prettyprint):
+    avg_time = prettyprint_time(avg_time)
+    latency = prettyprint_time(latency)
+    cardinality = "" if is_sink else prettyprint_units(cardinality)
+    cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
+    memory_used = prettyprint_bytes(memory_used)
+    memory_est = prettyprint_bytes(memory_est)
+
+  row = list()
+  if separate_prefix_column:
+    row.append(label_prefix)
+    row.append(node.label)
+  else:
+    row.append(label_prefix + node.label)
+
+  row.extend([
+    node.num_hosts,
+    instances,
+    avg_time,
+    latency,
+    cardinality,
+    cardinality_est,
+    memory_used,
+    memory_est,
+    node.label_detail])
+
+  output.append(row)
+  try:
+    sender_idx = summary.exch_to_sender_map[idx]
+    # This is an exchange node or a join node with a separate builder, so the 
source
+    # is a fragment root, and should be printed next.
+    sender_indent_level = indent_level + node.num_children
+    sender_new_indent_level = node.num_children > 0
+    build_exec_summary_table(summary, sender_idx, sender_indent_level,
+                             sender_new_indent_level, output, is_prettyprint,
+                             separate_prefix_column)
+  except (KeyError, TypeError):
+    # Fall through if idx not in map, or if exch_to_sender_map itself is not 
set
+    pass
+
+  idx += 1
+  if node.num_children > 0:
+    first_child_output = []
+    idx = build_exec_summary_table(summary, idx, indent_level, False, 
first_child_output,
+                                   is_prettyprint, separate_prefix_column)
+    for child_idx in xrange(1, node.num_children):
+      # All other children are indented (we only have 0, 1 or 2 children for 
every exec
+      # node at the moment)
+      idx = build_exec_summary_table(summary, idx, indent_level + 1, True, 
output,
+                                     is_prettyprint, separate_prefix_column)
+    output += first_child_output
+  return idx
+
+
 class QueryOptionLevels:
   """These are the levels used when displaying query options.
   The values correspond to the ones in TQueryOptionLevel"""
@@ -196,7 +349,7 @@ class ImpalaClient(object):
     try:
       self._open_session()
       return self._ping_impala_service()
-    except:
+    except Exception:
       # Ensure we are in a disconnected state if we failed above.
       self.close_connection()
       raise
@@ -525,130 +678,9 @@ class ImpalaClient(object):
     sock.setTimeout(None)
     return transport
 
-  def build_summary_table(self, summary, idx, is_fragment_root, indent_level,
-      new_indent_level, output):
-    """Direct translation of Coordinator::PrintExecSummary() to recursively 
build a list
-    of rows of summary statistics, one per exec node
-
-    summary: the TExecSummary object that contains all the summary data
-
-    idx: the index of the node to print
-
-    is_fragment_root: true if the node to print is the root of a fragment (and 
therefore
-    feeds into an exchange)
-
-    indent_level: the number of spaces to print before writing the node's 
label, to give
-    the appearance of a tree. The 0th child of a node has the same 
indent_level as its
-    parent. All other children have an indent_level of one greater than their 
parent.
-
-    output: the list of rows into which to append the rows produced for this 
node and its
-    children.
-
-    Returns the index of the next exec node in summary.exec_nodes that should 
be
-    processed, used internally to this method only.
-
-    NOTE: This is duplicated in impala_beeswax.py, and changes made here 
should also be
-    made there. TODO: refactor into a shared library. (IMPALA-5792)
-    """
-    attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
-
-    # Initialise aggregate and maximum stats
-    agg_stats, max_stats = TExecStats(), TExecStats()
-    for attr in attrs:
-      setattr(agg_stats, attr, 0)
-      setattr(max_stats, attr, 0)
-
-    node = summary.nodes[idx]
-    if node.exec_stats is not None:
-      for stats in node.exec_stats:
-        for attr in attrs:
-          val = getattr(stats, attr)
-          if val is not None:
-            setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
-            setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
-    if node.exec_stats is not None and node.exec_stats:
-      avg_time = agg_stats.latency_ns / len(node.exec_stats)
-    else:
-      avg_time = 0
-
-    # If the node is a broadcast-receiving exchange node, the cardinality of 
rows produced
-    # is the max over all instances (which should all have received the same 
number of
-    # rows). Otherwise, the cardinality is the sum over all instances which 
process
-    # disjoint partitions.
-    if node.is_broadcast:
-      cardinality = max_stats.cardinality
-    else:
-      cardinality = agg_stats.cardinality
-
-    est_stats = node.estimated_stats
-    label_prefix = ""
-    if indent_level > 0:
-      label_prefix = "|"
-      label_prefix += "  |" * (indent_level - 1)
-      if new_indent_level:
-        label_prefix += "--"
-      else:
-        label_prefix += "  "
-
-    def prettyprint(val, units, divisor):
-      for unit in units:
-        if val < divisor:
-          if unit == units[0]:
-            return "%d%s" % (val, unit)
-          else:
-            return "%3.2f%s" % (val, unit)
-        val /= divisor
-
-    def prettyprint_bytes(byte_val):
-      return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
-
-    def prettyprint_units(unit_val):
-      return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
-
-    def prettyprint_time(time_val):
-      return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
-
-    instances = 0
-    if node.exec_stats is not None:
-      instances = len(node.exec_stats)
-    is_sink = node.node_id == -1
-    row = [ label_prefix + node.label,
-            node.num_hosts, instances,
-            prettyprint_time(avg_time),
-            prettyprint_time(max_stats.latency_ns),
-            "" if is_sink else prettyprint_units(cardinality),
-            "" if is_sink else prettyprint_units(est_stats.cardinality),
-            prettyprint_bytes(max_stats.memory_used),
-            prettyprint_bytes(est_stats.memory_used),
-            node.label_detail ]
-
-    output.append(row)
-    try:
-      sender_idx = summary.exch_to_sender_map[idx]
-      # This is an exchange node or a join node with a separate builder, so 
the source
-      # is a fragment root, and should be printed next.
-      sender_indent_level = indent_level + node.num_children
-      sender_new_indent_level = node.num_children > 0
-      self.build_summary_table(
-          summary, sender_idx, True, sender_indent_level, 
sender_new_indent_level, output)
-    except (KeyError, TypeError):
-      # Fall through if idx not in map, or if exch_to_sender_map itself is not 
set
-      pass
-
-    idx += 1
-    if node.num_children > 0:
-      first_child_output = []
-      idx = \
-        self.build_summary_table(
-            summary, idx, False, indent_level, False, first_child_output)
-      for child_idx in xrange(1, node.num_children):
-        # All other children are indented (we only have 0, 1 or 2 children for 
every exec
-        # node at the moment)
-        idx = self.build_summary_table(
-            summary, idx, False, indent_level + 1, True, output)
-      output += first_child_output
-    return idx
+  def build_summary_table(self, summary, output):
+    build_exec_summary_table(summary, 0, 0, False, output, is_prettyprint=True,
+                             separate_prefix_column=False)
 
   def _get_sleep_interval(self, start_time):
     """Returns a step function of time to sleep in seconds before polling
@@ -728,8 +760,8 @@ class ImpalaHS2Client(ImpalaClient):
                           username=self.user)
     resp = self._do_hs2_rpc(OpenSession, req, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
-    assert (resp.serverProtocolVersion ==
-            TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), 
resp.serverProtocolVersion
+    assert (resp.serverProtocolVersion
+            == TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), 
resp.serverProtocolVersion
     # TODO: ensure it's closed if needed
     self.session_handle = resp.sessionHandle
 
@@ -755,7 +787,6 @@ class ImpalaHS2Client(ImpalaClient):
 
     return headers
 
-
   def close_connection(self):
     if self.session_handle is not None:
       # Attempt to close session explicitly. Do not fail if there is an error
@@ -799,8 +830,8 @@ class ImpalaHS2Client(ImpalaClient):
         QueryStateException):
         raise
       except RPCException as r:
-        if (r.exception_type == RPC_EXCEPTION_TAPPLICATION or
-          r.exception_type == RPC_EXCEPTION_SERVER):
+        if (r.exception_type == RPC_EXCEPTION_TAPPLICATION
+            or r.exception_type == RPC_EXCEPTION_SERVER):
           raise
         log_exception_with_timestamp(r, "Exception",
            "type={0} when listing query options. {1}".format(type(r), 
retry_msg))
@@ -1465,7 +1496,7 @@ class ImpalaBeeswaxClient(ImpalaClient):
     dml_result, rpc_status = self._do_beeswax_rpc(
         lambda: self.imp_service.CloseInsert(last_query_handle))
     if rpc_status != RpcStatus.OK:
-       raise RPCException()
+      raise RPCException()
     last_query_handle.is_closed = True
     return self._process_dml_result(dml_result)
 
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index b7a491d1f..afea9e3d2 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -74,6 +74,7 @@ DEFAULT_HS2_HTTP_PORT = 28000
 DEFAULT_STRICT_HS2_PORT = 11050
 DEFAULT_STRICT_HS2_HTTP_PORT = 10001
 
+
 def strip_comments(sql):
   """sqlparse default implementation of strip comments has a bad performance 
when parsing
   very large SQL due to the grouping. This is because the default 
implementation tries to
@@ -166,7 +167,7 @@ class ImpalaShell(cmd.Cmd, object):
   COMMENTS_BEFORE_SET_PATTERN = r'^(\s*/\*(.|\n)*?\*/|\s*--.*\n)*\s*((un)?set)'
   COMMENTS_BEFORE_SET_REPLACEMENT = r'\3'
   # Variable names are prefixed with the following string
-  VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ]
+  VAR_PREFIXES = ['VAR', 'HIVEVAR']
   DEFAULT_DB = 'default'
   # Regex applied to all tokens of a query to detect DML statements.
   DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
@@ -180,10 +181,10 @@ class ImpalaShell(cmd.Cmd, object):
   VALID_SHELL_OPTIONS = {
     'LIVE_PROGRESS': (lambda x: x in ImpalaShell.TRUE_STRINGS, 
"live_progress"),
     'LIVE_SUMMARY': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_summary"),
-    'WRITE_DELIMITED' : (lambda x: x in ImpalaShell.TRUE_STRINGS, 
"write_delimited"),
-    'VERBOSE' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
-    'DELIMITER' : (lambda x: " " if x == '\\s' else x, "output_delimiter"),
-    'OUTPUT_FILE' : (lambda x: None if x == '' else x, "output_file"),
+    'WRITE_DELIMITED': (lambda x: x in ImpalaShell.TRUE_STRINGS, 
"write_delimited"),
+    'VERBOSE': (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
+    'DELIMITER': (lambda x: " " if x == '\\s' else x, "output_delimiter"),
+    'OUTPUT_FILE': (lambda x: None if x == '' else x, "output_file"),
     'VERTICAL': (lambda x: x in ImpalaShell.TRUE_STRINGS, "vertical"),
   }
 
@@ -217,8 +218,8 @@ class ImpalaShell(cmd.Cmd, object):
         (self.strict_hs2_protocol and not self.use_kerberos and not 
self.use_jwt)
     self.client_connect_timeout_ms = options.client_connect_timeout_ms
     self.http_socket_timeout_s = None
-    if (options.http_socket_timeout_s != 'None' and
-          options.http_socket_timeout_s is not None):
+    if (options.http_socket_timeout_s != 'None'
+        and options.http_socket_timeout_s is not None):
         self.http_socket_timeout_s = float(options.http_socket_timeout_s)
     self.connect_max_tries = options.connect_max_tries
     self.verbose = options.verbose
@@ -415,8 +416,8 @@ class ImpalaShell(cmd.Cmd, object):
     default values.
     query_options parameter is a subset of the default_query_options map"""
     for option in sorted(query_options):
-      if (option in self.set_query_options and
-          self.set_query_options[option] != query_options[option]):  # noqa
+      if (option in self.set_query_options
+          and self.set_query_options[option] != query_options[option]):  # noqa
         print('\n'.join(["\t%s: %s" % (option, 
self.set_query_options[option])]))
       else:
         print('\n'.join(["\t%s: [%s]" % (option, query_options[option])]))
@@ -765,7 +766,7 @@ class ImpalaShell(cmd.Cmd, object):
     # Cmd is an old-style class, hence we need to call the method directly
     # instead of using super()
     # TODO: This may have to be changed to a super() call once we move to 
Python 3
-    if line == None:
+    if line is None:
       return CmdStatus.ERROR
     else:
       # This code is based on the code from the standard Python library 
package cmd.py:
@@ -848,15 +849,15 @@ class ImpalaShell(cmd.Cmd, object):
     arg_mode = str(arg_mode).lower()
     if arg_mode not in [QueryAttemptDisplayModes.ALL,
         QueryAttemptDisplayModes.LATEST, QueryAttemptDisplayModes.ORIGINAL]:
-      print("Invalid value for query attempt display mode: \'" +
-          arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
+      print("Invalid value for query attempt display mode: \'"
+            + arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
       return None
     return arg_mode
 
   def print_exec_summary(self, summary):
     output = []
     table = self._default_summary_table()
-    self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
+    self.imp_client.build_summary_table(summary, output)
     formatter = PrettyOutputFormatter(table)
     self.output_stream = OutputStream(formatter, filename=self.output_file)
     self.output_stream.write(output)
@@ -1113,7 +1114,7 @@ class ImpalaShell(cmd.Cmd, object):
           self.ldap_password.endswith('\n'):
         print("Warning: LDAP password contains a trailing newline. "
               "Did you use 'echo' instead of 'echo -n'?", file=sys.stderr)
-      if self.use_ssl and sys.version_info < (2,7,9) \
+      if self.use_ssl and sys.version_info < (2, 7, 9) \
           and "EOF occurred in violation of protocol" in str(e):
         print("Warning: TLSv1.2 is not supported for Python < 2.7.9", 
file=sys.stderr)
       log_exception_with_timestamp(e, "Exception",
@@ -1190,7 +1191,7 @@ class ImpalaShell(cmd.Cmd, object):
     arg = arg.replace('\n', '')
     # Get the database and table name, using the current database if the table 
name
     # wasn't fully qualified.
-    db_name, tbl_name = self.current_db, arg
+    db_name = self.current_db
     if db_name is None:
       db_name = ImpalaShell.DEFAULT_DB
     db_table_name = arg.split('.')
@@ -1341,7 +1342,7 @@ class ImpalaShell(cmd.Cmd, object):
       if self.live_summary:
         table = self._default_summary_table()
         output = []
-        self.imp_client.build_summary_table(summary, 0, False, 0, False, 
output)
+        self.imp_client.build_summary_table(summary, output)
         formatter = PrettyOutputFormatter(table)
         data += formatter.format(output) + "\n"
 
@@ -1408,8 +1409,8 @@ class ImpalaShell(cmd.Cmd, object):
             "Query state can be monitored at: %s" % 
self.imp_client.get_query_link(
              self.imp_client.get_query_id_str(self.last_query_handle)))
 
-      wait_to_finish = self.imp_client.wait_to_finish(self.last_query_handle,
-          self._periodic_wait_callback)
+      self.imp_client.wait_to_finish(
+        self.last_query_handle, self._periodic_wait_callback)
       # Reset the progress stream.
       self.progress_stream.clear()
 
@@ -1517,7 +1518,6 @@ class ImpalaShell(cmd.Cmd, object):
       self.prompt = ImpalaShell.DISCONNECTED_PROMPT
     return CmdStatus.ERROR
 
-
   def construct_table_with_header(self, column_names):
     """ Constructs the table header for a given query handle.
 
@@ -1822,7 +1822,7 @@ class ImpalaShell(cmd.Cmd, object):
     """
     cmd_names = [cmd for cmd in self.commands if cmd.startswith(text.lower())]
     # If the user input is upper case, return commands in upper case.
-    if text.isupper(): return [cmd_names.upper() for cmd_names in cmd_names]
+    if text.isupper(): return [cmd_name.upper() for cmd_name in cmd_names]
     # If the user input is lower case or mixed case, return lower case 
commands.
     return cmd_names
 
@@ -1873,7 +1873,7 @@ HEADER_DIVIDER =\
 def _format_tip(tip):
   """Takes a tip string and splits it on word boundaries so that it fits 
neatly inside the
   shell header."""
-  return '\n'.join([l for l in textwrap.wrap(tip, len(HEADER_DIVIDER))])
+  return '\n'.join([line for line in textwrap.wrap(tip, len(HEADER_DIVIDER))])
 
 
 WELCOME_STRING = """\
@@ -1916,8 +1916,8 @@ def parse_variables(keyvals):
     for keyval in keyvals:
       match = re.match(kv_pattern, keyval)
       if not match:
-        print('Error: Could not parse key-value "%s". ' % (keyval,) +
-              'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
+        print('Error: Could not parse key-value "%s". ' % (keyval,)
+              + 'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
         parser.print_help()
         raise FatalShellException()
       else:
@@ -1943,8 +1943,8 @@ def replace_variables(set_variables, input_string):
     # Check if syntax is correct
     var_name = get_var_name(name)
     if var_name is None:
-      print('Error: Unknown substitution syntax (%s). ' % (name,) +
-            'Use ${VAR:var_name}.', file=sys.stderr)
+      print('Error: Unknown substitution syntax (%s). ' % (name,)
+            + 'Use ${VAR:var_name}.', file=sys.stderr)
       errors = True
     else:
       # Replaces variable value
@@ -1998,8 +1998,8 @@ def execute_queries_non_interactive_mode(options, 
query_options):
 
   queries = parse_query_text(query_text)
   with ImpalaShell(options, query_options) as shell:
-    return (shell.execute_query_list(shell.cmdqueue) and
-            shell.execute_query_list(queries))
+    return (shell.execute_query_list(shell.cmdqueue)
+            and shell.execute_query_list(queries))
 
 
 def get_intro(options):
@@ -2062,7 +2062,6 @@ def read_password_cmd(password_cmd, auth_method_desc, 
strip_newline=False):
     raise FatalShellException()
 
 
-
 def impala_shell_main():
   """
   There are two types of options: shell options and query_options. Both can be 
set on the
@@ -2165,14 +2164,14 @@ def impala_shell_main():
     raise FatalShellException()
 
   if not options.ssl and not options.creds_ok_in_clear and options.use_ldap:
-    print("LDAP credentials may not be sent over insecure " +
-          "connections. Enable SSL or set --auth_creds_ok_in_clear",
+    print(("LDAP credentials may not be sent over insecure "
+           "connections. Enable SSL or set --auth_creds_ok_in_clear"),
           file=sys.stderr)
     raise FatalShellException()
 
   if not options.use_ldap and options.ldap_password_cmd:
-    print("Option --ldap_password_cmd requires using LDAP authentication " +
-          "mechanism (-l)", file=sys.stderr)
+    print(("Option --ldap_password_cmd requires using LDAP authentication "
+           "mechanism (-l)"), file=sys.stderr)
     raise FatalShellException()
 
   if options.use_jwt and options.protocol.lower() != 'hs2-http':
@@ -2273,8 +2272,8 @@ def impala_shell_main():
       raise FatalShellException()
 
   if options.http_socket_timeout_s is not None:
-    if (options.http_socket_timeout_s != 'None' and
-          float(options.http_socket_timeout_s) < 0):
+    if (options.http_socket_timeout_s != 'None'
+        and float(options.http_socket_timeout_s) < 0):
         print("http_socket_timeout_s must be a nonnegative floating point 
number"
               " expressing seconds, or None", file=sys.stderr)
         raise FatalShellException()
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 528eaeec2..9b5cef23e 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -26,7 +26,7 @@
 #   result = client.execute(query_string)
 #   where result is an object of the class ImpalaBeeswaxResult.
 from __future__ import absolute_import, division, print_function
-from builtins import filter, map, range
+from builtins import filter, map
 import logging
 import time
 import shlex
@@ -36,11 +36,6 @@ import re
 from beeswaxd import BeeswaxService
 from beeswaxd.BeeswaxService import QueryState
 from datetime import datetime
-try:
-  # If Exec Summary is not implemented in Impala, this cannot be imported
-  from ExecStats.ttypes import TExecStats
-except ImportError:
-  pass
 from ImpalaService import ImpalaService
 from tests.util.thrift_util import create_transport
 from thrift.transport.TTransport import TTransportException
@@ -49,11 +44,13 @@ from thrift.Thrift import TApplicationException
 
 LOG = logging.getLogger('impala_beeswax')
 
+
 # Custom exception wrapper.
 # All exceptions coming from thrift/beeswax etc. go through this wrapper.
 # TODO: Add the ability to print some of the stack.
 class ImpalaBeeswaxException(Exception):
   __name__ = "ImpalaBeeswaxException"
+
   def __init__(self, message, inner_exception):
     self.__message = message
     self.inner_exception = inner_exception
@@ -61,6 +58,7 @@ class ImpalaBeeswaxException(Exception):
   def __str__(self):
     return self.__message
 
+
 class ImpalaBeeswaxResult(object):
   def __init__(self, **kwargs):
     self.query = kwargs.get('query', None)
@@ -99,10 +97,10 @@ class ImpalaBeeswaxResult(object):
                'Took: %s(s)\n'
                'Data:\n%s\n'
                % (self.summary, self.success, self.time_taken,
-                  self.__format_data())
-              )
+                  self.__format_data()))
     return message
 
+
 # Interface to beeswax. Responsible for executing queries, fetching results.
 class ImpalaBeeswaxClient(object):
   # Regex applied to all tokens of a query to detect the query type.
@@ -243,121 +241,24 @@ class ImpalaBeeswaxClient(object):
     if summary is None or summary.nodes is None:
       return None
       # If exec summary is not implemented in Impala, this function returns, 
so we do not
-      # get the function __build_summary_table which requires TExecStats to be 
imported.
+      # get the function build_exec_summary_table which requires TExecStats to 
be
+      # imported.
 
-    output = []
-    self.__build_summary_table(summary, 0, False, 0, False, output)
+    output = list()
+    self.__build_summary_table(summary, output)
     return output
 
-  def __build_summary_table(self, summary, idx, is_fragment_root, indent_level,
-      new_indent_level, output):
-    """NOTE: This was taken from impala_shell.py. Changes made here must be 
made there as
-    well. TODO: This method will be a placed in a library that is shared 
between
-    impala_shell and this file. (IMPALA-5792)
-
-    Direct translation of Coordinator::PrintExecSummary() to recursively build 
a list
-    of rows of summary statistics, one per exec node
-
-    summary: the TExecSummary object that contains all the summary data
-
-    idx: the index of the node to print
-
-    is_fragment_root: true if the node to print is the root of a fragment (and 
therefore
-    feeds into an exchange)
-
-    indent_level: the number of spaces to print before writing the node's 
label, to give
-    the appearance of a tree. The 0th child of a node has the same 
indent_level as its
-    parent. All other children have an indent_level of one greater than their 
parent.
-
-    new_indent_level: If true, this indent level is different from the 
previous row's.
-
-    output: the list of rows into which to append the rows produced for this 
node and its
-    children.
-
-    Returns the index of the next exec node in summary.exec_nodes that should 
be
-    processed, used internally to this method only.
-    """
-    attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
-
-    # Initialise aggregate and maximum stats
-    agg_stats, max_stats = TExecStats(), TExecStats()
-    for attr in attrs:
-      setattr(agg_stats, attr, 0)
-      setattr(max_stats, attr, 0)
-
-    row = {}
-    node = summary.nodes[idx]
-    # exec_stats may not be set even if the query is FINISHED if there are 
fragments that
-    # are still executing or that were cancelled before sending a status 
report.
-    if node.exec_stats is not None:
-      for stats in node.exec_stats:
-        for attr in attrs:
-          val = getattr(stats, attr)
-          if val is not None:
-            setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
-            setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
-      if len(node.exec_stats) > 0:
-        avg_time = agg_stats.latency_ns // len(node.exec_stats)
-      else:
-        avg_time = 0
-
-      row["num_instances"] = len(node.exec_stats)
-      row["num_hosts"] = node.num_hosts
-      row["avg_time"] = avg_time
-
-    is_sink = node.node_id == -1
-    # If the node is a broadcast-receiving exchange node, the cardinality of 
rows produced
-    # is the max over all instances (which should all have received the same 
number of
-    # rows). Otherwise, the cardinality is the sum over all instances which 
process
-    # disjoint partitions.
-    if is_sink:
-      cardinality = -1
-    elif node.is_broadcast:
-      cardinality = max_stats.cardinality
-    else:
-      cardinality = agg_stats.cardinality
-
-    est_stats = node.estimated_stats
-
-    label_prefix = ""
-    if indent_level > 0:
-      label_prefix = "|"
-      label_prefix += "  |" * (indent_level - 1)
-      if new_indent_level:
-        label_prefix += "--"
-      else:
-        label_prefix += "  "
-
-    row["prefix"] = label_prefix
-    row["operator"] = node.label
-    row["max_time"] = max_stats.latency_ns
-    row["num_rows"] = cardinality
-    row["est_num_rows"] = est_stats.cardinality
-    row["peak_mem"] = max_stats.memory_used
-    row["est_peak_mem"] = est_stats.memory_used
-    row["detail"] = node.label_detail
-    output.append(row)
-
-    if summary.exch_to_sender_map is not None and idx in 
summary.exch_to_sender_map:
-      sender_idx = summary.exch_to_sender_map[idx]
-      # This is an exchange node, so the sender is a fragment root, and should 
be printed
-      # next.
-      self.__build_summary_table(summary, sender_idx, True, indent_level, 
False, output)
-
-    idx += 1
-    if node.num_children > 0:
-      first_child_output = []
-      idx = \
-        self.__build_summary_table(
-            summary, idx, False, indent_level, False, first_child_output)
-      for child_idx in range(1, node.num_children):
-        # All other children are indented (we only have 0, 1 or 2 children for 
every exec
-        # node at the moment)
-        idx = self.__build_summary_table(
-            summary, idx, False, indent_level + 1, True, output)
-      output += first_child_output
-    return idx
+  def __build_summary_table(self, summary, output):
+    from shell.impala_client import build_exec_summary_table
+    result = list()
+    build_exec_summary_table(summary, 0, 0, False, result, 
is_prettyprint=False,
+                             separate_prefix_column=True)
+    keys = ['prefix', 'operator', 'num_hosts', 'num_instances', 'avg_time', 
'max_time',
+            'num_rows', 'est_num_rows', 'peak_mem', 'est_peak_mem', 'detail']
+    for row in result:
+      assert len(keys) == len(row)
+      summ_map = dict(zip(keys, row))
+      output.append(summ_map)
 
   def get_runtime_profile(self, handle):
     return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle))
@@ -468,7 +369,7 @@ class ImpalaBeeswaxClient(object):
   def get_log(self, query_handle):
     return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
 
-  def fetch_results(self, query_string, query_handle, max_rows = -1):
+  def fetch_results(self, query_string, query_handle, max_rows=-1):
     """Fetches query results given a handle and query type (insert, use, 
other)"""
     query_type = self.__get_query_type(query_string)
     if query_type == 'use':
@@ -486,7 +387,7 @@ class ImpalaBeeswaxClient(object):
     exec_result.query = query_string
     return exec_result
 
-  def __fetch_results(self, handle, max_rows = -1):
+  def __fetch_results(self, handle, max_rows=-1):
     """Handles query results, returns a ImpalaBeeswaxResult object"""
     schema = self.__do_rpc(lambda: 
self.imp_service.get_results_metadata(handle)).schema
     # The query has finished, we can fetch the results


Reply via email to