This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 1f35747ea IMPALA-5792: Eliminate duplicate beeswax python code
1f35747ea is described below
commit 1f35747ea3677bd004f8ed7b1cc94d6ef2f70c8e
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Nov 12 13:12:55 2024 -0800
IMPALA-5792: Eliminate duplicate beeswax python code
This patch unify duplicated exec summary code used by python beeswax
clients: one used by the shell in impala_shell.py and one used by tests
in impala_beeswax.py. The code that has progress furthest is the one in
shell/impala_client.py, which is the one that can print correct exec
summary table for MT_DOP>0 queries. It is made into a dedicated
build_exec_summary_table function in impala_client.py, and then
impala_beeswax.py import it from impala_client.py.
This patch also fix several flake8 issues around the modified files.
Testing:
- Manually run TPC-DS Q74 in impala-shell and then type "summary"
command. Confirm that plan tree is displayed properly.
- Run single_node_perf_run.py over branches that produce different
TPC-DS Q74 plan tree. Confirm that the plan tree are displayed
correctly in performance_result.txt
Change-Id: Ica57c90dd571d9ac74d76d9830da26c7fe20c74f
Reviewed-on: http://gerrit.cloudera.org:8080/22060
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Daniel Becker <[email protected]>
---
shell/impala_client.py | 295 ++++++++++++++++++++++------------------
shell/impala_shell.py | 67 +++++----
tests/beeswax/impala_beeswax.py | 145 ++++----------------
3 files changed, 219 insertions(+), 288 deletions(-)
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 284d1923d..4c79a9dbf 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -98,9 +98,10 @@ def utf8_encode_if_needed(val):
val = val.encode('utf-8', errors='replace')
return val
+
# Regular expression that matches the progress line added to HS2 logs by
# the Impala server.
-HS2_LOG_PROGRESS_REGEX = re.compile("Query.*Complete \([0-9]* out of
[0-9]*\)\n")
+HS2_LOG_PROGRESS_REGEX = re.compile(r"Query.*Complete \([0-9]* out of
[0-9]*\)\n")
# Exception types to differentiate between the different RPCExceptions.
# RPCException raised when TApplicationException is caught.
@@ -108,6 +109,158 @@ RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
# RPCException raised when impala server sends a TStatusCode.ERROR_STATUS
status code.
RPC_EXCEPTION_SERVER = "SERVER_ERROR"
+
+def build_exec_summary_table(summary, idx, indent_level, new_indent_level,
output,
+ is_prettyprint=True,
separate_prefix_column=False):
+ """Direct translation of Coordinator::PrintExecSummary() to recursively
build a list
+ of rows of summary statistics, one per exec node
+
+ summary: the TExecSummary object that contains all the summary data
+
+ idx: the index of the node to print
+
+ indent_level: the number of spaces to print before writing the node's label,
to give
+ the appearance of a tree. The 0th child of a node has the same indent_level
as its
+ parent. All other children have an indent_level of one greater than their
parent.
+
+ new_indent_level: If true, this indent level is different from the previous
row's.
+
+ output: the list of rows into which to append the rows produced for this
node and its
+ children.
+
+ is_prettyprint: Optional. If True, print time, units, and bytes columns in
pretty
+ printed format.
+
+ separate_prefix_column: Optional. If True, the prefix and operator name will
be
+ returned as separate column. Otherwise, prefix and operater name will be
concatenated
+ into single column.
+
+ Returns the index of the next exec node in summary.exec_nodes that should be
+ processed, used internally to this method only.
+ """
+ attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
+
+ # Initialise aggregate and maximum stats
+ agg_stats, max_stats = TExecStats(), TExecStats()
+ for attr in attrs:
+ setattr(agg_stats, attr, 0)
+ setattr(max_stats, attr, 0)
+
+ node = summary.nodes[idx]
+ if node.exec_stats is not None:
+ for stats in node.exec_stats:
+ for attr in attrs:
+ val = getattr(stats, attr)
+ if val is not None:
+ setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
+ setattr(max_stats, attr, max(getattr(max_stats, attr), val))
+
+ if node.exec_stats is not None and node.exec_stats:
+ avg_time = agg_stats.latency_ns / len(node.exec_stats)
+ else:
+ avg_time = 0
+
+ is_sink = node.node_id == -1
+ # If the node is a broadcast-receiving exchange node, the cardinality of
rows produced
+ # is the max over all instances (which should all have received the same
number of
+ # rows). Otherwise, the cardinality is the sum over all instances which
process
+ # disjoint partitions.
+ if is_sink:
+ cardinality = -1
+ elif node.is_broadcast:
+ cardinality = max_stats.cardinality
+ else:
+ cardinality = agg_stats.cardinality
+
+ est_stats = node.estimated_stats
+ label_prefix = ""
+ if indent_level > 0:
+ label_prefix = "|"
+ label_prefix += " |" * (indent_level - 1)
+ if new_indent_level:
+ label_prefix += "--"
+ else:
+ label_prefix += " "
+
+ def prettyprint(val, units, divisor):
+ for unit in units:
+ if val < divisor:
+ if unit == units[0]:
+ return "%d%s" % (val, unit)
+ else:
+ return "%3.2f%s" % (val, unit)
+ val /= divisor
+
+ def prettyprint_bytes(byte_val):
+ return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
+
+ def prettyprint_units(unit_val):
+ return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
+
+ def prettyprint_time(time_val):
+ return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
+
+ instances = 0
+ if node.exec_stats is not None:
+ instances = len(node.exec_stats)
+ latency = max_stats.latency_ns
+ cardinality_est = est_stats.cardinality
+ memory_used = max_stats.memory_used
+ memory_est = est_stats.memory_used
+ if (is_prettyprint):
+ avg_time = prettyprint_time(avg_time)
+ latency = prettyprint_time(latency)
+ cardinality = "" if is_sink else prettyprint_units(cardinality)
+ cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
+ memory_used = prettyprint_bytes(memory_used)
+ memory_est = prettyprint_bytes(memory_est)
+
+ row = list()
+ if separate_prefix_column:
+ row.append(label_prefix)
+ row.append(node.label)
+ else:
+ row.append(label_prefix + node.label)
+
+ row.extend([
+ node.num_hosts,
+ instances,
+ avg_time,
+ latency,
+ cardinality,
+ cardinality_est,
+ memory_used,
+ memory_est,
+ node.label_detail])
+
+ output.append(row)
+ try:
+ sender_idx = summary.exch_to_sender_map[idx]
+ # This is an exchange node or a join node with a separate builder, so the
source
+ # is a fragment root, and should be printed next.
+ sender_indent_level = indent_level + node.num_children
+ sender_new_indent_level = node.num_children > 0
+ build_exec_summary_table(summary, sender_idx, sender_indent_level,
+ sender_new_indent_level, output, is_prettyprint,
+ separate_prefix_column)
+ except (KeyError, TypeError):
+ # Fall through if idx not in map, or if exch_to_sender_map itself is not
set
+ pass
+
+ idx += 1
+ if node.num_children > 0:
+ first_child_output = []
+ idx = build_exec_summary_table(summary, idx, indent_level, False,
first_child_output,
+ is_prettyprint, separate_prefix_column)
+ for child_idx in xrange(1, node.num_children):
+ # All other children are indented (we only have 0, 1 or 2 children for
every exec
+ # node at the moment)
+ idx = build_exec_summary_table(summary, idx, indent_level + 1, True,
output,
+ is_prettyprint, separate_prefix_column)
+ output += first_child_output
+ return idx
+
+
class QueryOptionLevels:
"""These are the levels used when displaying query options.
The values correspond to the ones in TQueryOptionLevel"""
@@ -196,7 +349,7 @@ class ImpalaClient(object):
try:
self._open_session()
return self._ping_impala_service()
- except:
+ except Exception:
# Ensure we are in a disconnected state if we failed above.
self.close_connection()
raise
@@ -525,130 +678,9 @@ class ImpalaClient(object):
sock.setTimeout(None)
return transport
- def build_summary_table(self, summary, idx, is_fragment_root, indent_level,
- new_indent_level, output):
- """Direct translation of Coordinator::PrintExecSummary() to recursively
build a list
- of rows of summary statistics, one per exec node
-
- summary: the TExecSummary object that contains all the summary data
-
- idx: the index of the node to print
-
- is_fragment_root: true if the node to print is the root of a fragment (and
therefore
- feeds into an exchange)
-
- indent_level: the number of spaces to print before writing the node's
label, to give
- the appearance of a tree. The 0th child of a node has the same
indent_level as its
- parent. All other children have an indent_level of one greater than their
parent.
-
- output: the list of rows into which to append the rows produced for this
node and its
- children.
-
- Returns the index of the next exec node in summary.exec_nodes that should
be
- processed, used internally to this method only.
-
- NOTE: This is duplicated in impala_beeswax.py, and changes made here
should also be
- made there. TODO: refactor into a shared library. (IMPALA-5792)
- """
- attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
-
- # Initialise aggregate and maximum stats
- agg_stats, max_stats = TExecStats(), TExecStats()
- for attr in attrs:
- setattr(agg_stats, attr, 0)
- setattr(max_stats, attr, 0)
-
- node = summary.nodes[idx]
- if node.exec_stats is not None:
- for stats in node.exec_stats:
- for attr in attrs:
- val = getattr(stats, attr)
- if val is not None:
- setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
- setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
- if node.exec_stats is not None and node.exec_stats:
- avg_time = agg_stats.latency_ns / len(node.exec_stats)
- else:
- avg_time = 0
-
- # If the node is a broadcast-receiving exchange node, the cardinality of
rows produced
- # is the max over all instances (which should all have received the same
number of
- # rows). Otherwise, the cardinality is the sum over all instances which
process
- # disjoint partitions.
- if node.is_broadcast:
- cardinality = max_stats.cardinality
- else:
- cardinality = agg_stats.cardinality
-
- est_stats = node.estimated_stats
- label_prefix = ""
- if indent_level > 0:
- label_prefix = "|"
- label_prefix += " |" * (indent_level - 1)
- if new_indent_level:
- label_prefix += "--"
- else:
- label_prefix += " "
-
- def prettyprint(val, units, divisor):
- for unit in units:
- if val < divisor:
- if unit == units[0]:
- return "%d%s" % (val, unit)
- else:
- return "%3.2f%s" % (val, unit)
- val /= divisor
-
- def prettyprint_bytes(byte_val):
- return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
-
- def prettyprint_units(unit_val):
- return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
-
- def prettyprint_time(time_val):
- return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
-
- instances = 0
- if node.exec_stats is not None:
- instances = len(node.exec_stats)
- is_sink = node.node_id == -1
- row = [ label_prefix + node.label,
- node.num_hosts, instances,
- prettyprint_time(avg_time),
- prettyprint_time(max_stats.latency_ns),
- "" if is_sink else prettyprint_units(cardinality),
- "" if is_sink else prettyprint_units(est_stats.cardinality),
- prettyprint_bytes(max_stats.memory_used),
- prettyprint_bytes(est_stats.memory_used),
- node.label_detail ]
-
- output.append(row)
- try:
- sender_idx = summary.exch_to_sender_map[idx]
- # This is an exchange node or a join node with a separate builder, so
the source
- # is a fragment root, and should be printed next.
- sender_indent_level = indent_level + node.num_children
- sender_new_indent_level = node.num_children > 0
- self.build_summary_table(
- summary, sender_idx, True, sender_indent_level,
sender_new_indent_level, output)
- except (KeyError, TypeError):
- # Fall through if idx not in map, or if exch_to_sender_map itself is not
set
- pass
-
- idx += 1
- if node.num_children > 0:
- first_child_output = []
- idx = \
- self.build_summary_table(
- summary, idx, False, indent_level, False, first_child_output)
- for child_idx in xrange(1, node.num_children):
- # All other children are indented (we only have 0, 1 or 2 children for
every exec
- # node at the moment)
- idx = self.build_summary_table(
- summary, idx, False, indent_level + 1, True, output)
- output += first_child_output
- return idx
+ def build_summary_table(self, summary, output):
+ build_exec_summary_table(summary, 0, 0, False, output, is_prettyprint=True,
+ separate_prefix_column=False)
def _get_sleep_interval(self, start_time):
"""Returns a step function of time to sleep in seconds before polling
@@ -728,8 +760,8 @@ class ImpalaHS2Client(ImpalaClient):
username=self.user)
resp = self._do_hs2_rpc(OpenSession, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
- assert (resp.serverProtocolVersion ==
- TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6),
resp.serverProtocolVersion
+ assert (resp.serverProtocolVersion
+ == TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6),
resp.serverProtocolVersion
# TODO: ensure it's closed if needed
self.session_handle = resp.sessionHandle
@@ -755,7 +787,6 @@ class ImpalaHS2Client(ImpalaClient):
return headers
-
def close_connection(self):
if self.session_handle is not None:
# Attempt to close session explicitly. Do not fail if there is an error
@@ -799,8 +830,8 @@ class ImpalaHS2Client(ImpalaClient):
QueryStateException):
raise
except RPCException as r:
- if (r.exception_type == RPC_EXCEPTION_TAPPLICATION or
- r.exception_type == RPC_EXCEPTION_SERVER):
+ if (r.exception_type == RPC_EXCEPTION_TAPPLICATION
+ or r.exception_type == RPC_EXCEPTION_SERVER):
raise
log_exception_with_timestamp(r, "Exception",
"type={0} when listing query options. {1}".format(type(r),
retry_msg))
@@ -1465,7 +1496,7 @@ class ImpalaBeeswaxClient(ImpalaClient):
dml_result, rpc_status = self._do_beeswax_rpc(
lambda: self.imp_service.CloseInsert(last_query_handle))
if rpc_status != RpcStatus.OK:
- raise RPCException()
+ raise RPCException()
last_query_handle.is_closed = True
return self._process_dml_result(dml_result)
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index b7a491d1f..afea9e3d2 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -74,6 +74,7 @@ DEFAULT_HS2_HTTP_PORT = 28000
DEFAULT_STRICT_HS2_PORT = 11050
DEFAULT_STRICT_HS2_HTTP_PORT = 10001
+
def strip_comments(sql):
"""sqlparse default implementation of strip comments has a bad performance
when parsing
very large SQL due to the grouping. This is because the default
implementation tries to
@@ -166,7 +167,7 @@ class ImpalaShell(cmd.Cmd, object):
COMMENTS_BEFORE_SET_PATTERN = r'^(\s*/\*(.|\n)*?\*/|\s*--.*\n)*\s*((un)?set)'
COMMENTS_BEFORE_SET_REPLACEMENT = r'\3'
# Variable names are prefixed with the following string
- VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ]
+ VAR_PREFIXES = ['VAR', 'HIVEVAR']
DEFAULT_DB = 'default'
# Regex applied to all tokens of a query to detect DML statements.
DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
@@ -180,10 +181,10 @@ class ImpalaShell(cmd.Cmd, object):
VALID_SHELL_OPTIONS = {
'LIVE_PROGRESS': (lambda x: x in ImpalaShell.TRUE_STRINGS,
"live_progress"),
'LIVE_SUMMARY': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_summary"),
- 'WRITE_DELIMITED' : (lambda x: x in ImpalaShell.TRUE_STRINGS,
"write_delimited"),
- 'VERBOSE' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
- 'DELIMITER' : (lambda x: " " if x == '\\s' else x, "output_delimiter"),
- 'OUTPUT_FILE' : (lambda x: None if x == '' else x, "output_file"),
+ 'WRITE_DELIMITED': (lambda x: x in ImpalaShell.TRUE_STRINGS,
"write_delimited"),
+ 'VERBOSE': (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
+ 'DELIMITER': (lambda x: " " if x == '\\s' else x, "output_delimiter"),
+ 'OUTPUT_FILE': (lambda x: None if x == '' else x, "output_file"),
'VERTICAL': (lambda x: x in ImpalaShell.TRUE_STRINGS, "vertical"),
}
@@ -217,8 +218,8 @@ class ImpalaShell(cmd.Cmd, object):
(self.strict_hs2_protocol and not self.use_kerberos and not
self.use_jwt)
self.client_connect_timeout_ms = options.client_connect_timeout_ms
self.http_socket_timeout_s = None
- if (options.http_socket_timeout_s != 'None' and
- options.http_socket_timeout_s is not None):
+ if (options.http_socket_timeout_s != 'None'
+ and options.http_socket_timeout_s is not None):
self.http_socket_timeout_s = float(options.http_socket_timeout_s)
self.connect_max_tries = options.connect_max_tries
self.verbose = options.verbose
@@ -415,8 +416,8 @@ class ImpalaShell(cmd.Cmd, object):
default values.
query_options parameter is a subset of the default_query_options map"""
for option in sorted(query_options):
- if (option in self.set_query_options and
- self.set_query_options[option] != query_options[option]): # noqa
+ if (option in self.set_query_options
+ and self.set_query_options[option] != query_options[option]): # noqa
print('\n'.join(["\t%s: %s" % (option,
self.set_query_options[option])]))
else:
print('\n'.join(["\t%s: [%s]" % (option, query_options[option])]))
@@ -765,7 +766,7 @@ class ImpalaShell(cmd.Cmd, object):
# Cmd is an old-style class, hence we need to call the method directly
# instead of using super()
# TODO: This may have to be changed to a super() call once we move to
Python 3
- if line == None:
+ if line is None:
return CmdStatus.ERROR
else:
# This code is based on the code from the standard Python library
package cmd.py:
@@ -848,15 +849,15 @@ class ImpalaShell(cmd.Cmd, object):
arg_mode = str(arg_mode).lower()
if arg_mode not in [QueryAttemptDisplayModes.ALL,
QueryAttemptDisplayModes.LATEST, QueryAttemptDisplayModes.ORIGINAL]:
- print("Invalid value for query attempt display mode: \'" +
- arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
+ print("Invalid value for query attempt display mode: \'"
+ + arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
return None
return arg_mode
def print_exec_summary(self, summary):
output = []
table = self._default_summary_table()
- self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
+ self.imp_client.build_summary_table(summary, output)
formatter = PrettyOutputFormatter(table)
self.output_stream = OutputStream(formatter, filename=self.output_file)
self.output_stream.write(output)
@@ -1113,7 +1114,7 @@ class ImpalaShell(cmd.Cmd, object):
self.ldap_password.endswith('\n'):
print("Warning: LDAP password contains a trailing newline. "
"Did you use 'echo' instead of 'echo -n'?", file=sys.stderr)
- if self.use_ssl and sys.version_info < (2,7,9) \
+ if self.use_ssl and sys.version_info < (2, 7, 9) \
and "EOF occurred in violation of protocol" in str(e):
print("Warning: TLSv1.2 is not supported for Python < 2.7.9",
file=sys.stderr)
log_exception_with_timestamp(e, "Exception",
@@ -1190,7 +1191,7 @@ class ImpalaShell(cmd.Cmd, object):
arg = arg.replace('\n', '')
# Get the database and table name, using the current database if the table
name
# wasn't fully qualified.
- db_name, tbl_name = self.current_db, arg
+ db_name = self.current_db
if db_name is None:
db_name = ImpalaShell.DEFAULT_DB
db_table_name = arg.split('.')
@@ -1341,7 +1342,7 @@ class ImpalaShell(cmd.Cmd, object):
if self.live_summary:
table = self._default_summary_table()
output = []
- self.imp_client.build_summary_table(summary, 0, False, 0, False,
output)
+ self.imp_client.build_summary_table(summary, output)
formatter = PrettyOutputFormatter(table)
data += formatter.format(output) + "\n"
@@ -1408,8 +1409,8 @@ class ImpalaShell(cmd.Cmd, object):
"Query state can be monitored at: %s" %
self.imp_client.get_query_link(
self.imp_client.get_query_id_str(self.last_query_handle)))
- wait_to_finish = self.imp_client.wait_to_finish(self.last_query_handle,
- self._periodic_wait_callback)
+ self.imp_client.wait_to_finish(
+ self.last_query_handle, self._periodic_wait_callback)
# Reset the progress stream.
self.progress_stream.clear()
@@ -1517,7 +1518,6 @@ class ImpalaShell(cmd.Cmd, object):
self.prompt = ImpalaShell.DISCONNECTED_PROMPT
return CmdStatus.ERROR
-
def construct_table_with_header(self, column_names):
""" Constructs the table header for a given query handle.
@@ -1822,7 +1822,7 @@ class ImpalaShell(cmd.Cmd, object):
"""
cmd_names = [cmd for cmd in self.commands if cmd.startswith(text.lower())]
# If the user input is upper case, return commands in upper case.
- if text.isupper(): return [cmd_names.upper() for cmd_names in cmd_names]
+ if text.isupper(): return [cmd_name.upper() for cmd_name in cmd_names]
# If the user input is lower case or mixed case, return lower case
commands.
return cmd_names
@@ -1873,7 +1873,7 @@ HEADER_DIVIDER =\
def _format_tip(tip):
"""Takes a tip string and splits it on word boundaries so that it fits
neatly inside the
shell header."""
- return '\n'.join([l for l in textwrap.wrap(tip, len(HEADER_DIVIDER))])
+ return '\n'.join([line for line in textwrap.wrap(tip, len(HEADER_DIVIDER))])
WELCOME_STRING = """\
@@ -1916,8 +1916,8 @@ def parse_variables(keyvals):
for keyval in keyvals:
match = re.match(kv_pattern, keyval)
if not match:
- print('Error: Could not parse key-value "%s". ' % (keyval,) +
- 'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
+ print('Error: Could not parse key-value "%s". ' % (keyval,)
+ + 'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
parser.print_help()
raise FatalShellException()
else:
@@ -1943,8 +1943,8 @@ def replace_variables(set_variables, input_string):
# Check if syntax is correct
var_name = get_var_name(name)
if var_name is None:
- print('Error: Unknown substitution syntax (%s). ' % (name,) +
- 'Use ${VAR:var_name}.', file=sys.stderr)
+ print('Error: Unknown substitution syntax (%s). ' % (name,)
+ + 'Use ${VAR:var_name}.', file=sys.stderr)
errors = True
else:
# Replaces variable value
@@ -1998,8 +1998,8 @@ def execute_queries_non_interactive_mode(options,
query_options):
queries = parse_query_text(query_text)
with ImpalaShell(options, query_options) as shell:
- return (shell.execute_query_list(shell.cmdqueue) and
- shell.execute_query_list(queries))
+ return (shell.execute_query_list(shell.cmdqueue)
+ and shell.execute_query_list(queries))
def get_intro(options):
@@ -2062,7 +2062,6 @@ def read_password_cmd(password_cmd, auth_method_desc,
strip_newline=False):
raise FatalShellException()
-
def impala_shell_main():
"""
There are two types of options: shell options and query_options. Both can be
set on the
@@ -2165,14 +2164,14 @@ def impala_shell_main():
raise FatalShellException()
if not options.ssl and not options.creds_ok_in_clear and options.use_ldap:
- print("LDAP credentials may not be sent over insecure " +
- "connections. Enable SSL or set --auth_creds_ok_in_clear",
+ print(("LDAP credentials may not be sent over insecure "
+ "connections. Enable SSL or set --auth_creds_ok_in_clear"),
file=sys.stderr)
raise FatalShellException()
if not options.use_ldap and options.ldap_password_cmd:
- print("Option --ldap_password_cmd requires using LDAP authentication " +
- "mechanism (-l)", file=sys.stderr)
+ print(("Option --ldap_password_cmd requires using LDAP authentication "
+ "mechanism (-l)"), file=sys.stderr)
raise FatalShellException()
if options.use_jwt and options.protocol.lower() != 'hs2-http':
@@ -2273,8 +2272,8 @@ def impala_shell_main():
raise FatalShellException()
if options.http_socket_timeout_s is not None:
- if (options.http_socket_timeout_s != 'None' and
- float(options.http_socket_timeout_s) < 0):
+ if (options.http_socket_timeout_s != 'None'
+ and float(options.http_socket_timeout_s) < 0):
print("http_socket_timeout_s must be a nonnegative floating point
number"
" expressing seconds, or None", file=sys.stderr)
raise FatalShellException()
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 528eaeec2..9b5cef23e 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -26,7 +26,7 @@
# result = client.execute(query_string)
# where result is an object of the class ImpalaBeeswaxResult.
from __future__ import absolute_import, division, print_function
-from builtins import filter, map, range
+from builtins import filter, map
import logging
import time
import shlex
@@ -36,11 +36,6 @@ import re
from beeswaxd import BeeswaxService
from beeswaxd.BeeswaxService import QueryState
from datetime import datetime
-try:
- # If Exec Summary is not implemented in Impala, this cannot be imported
- from ExecStats.ttypes import TExecStats
-except ImportError:
- pass
from ImpalaService import ImpalaService
from tests.util.thrift_util import create_transport
from thrift.transport.TTransport import TTransportException
@@ -49,11 +44,13 @@ from thrift.Thrift import TApplicationException
LOG = logging.getLogger('impala_beeswax')
+
# Custom exception wrapper.
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
# TODO: Add the ability to print some of the stack.
class ImpalaBeeswaxException(Exception):
__name__ = "ImpalaBeeswaxException"
+
def __init__(self, message, inner_exception):
self.__message = message
self.inner_exception = inner_exception
@@ -61,6 +58,7 @@ class ImpalaBeeswaxException(Exception):
def __str__(self):
return self.__message
+
class ImpalaBeeswaxResult(object):
def __init__(self, **kwargs):
self.query = kwargs.get('query', None)
@@ -99,10 +97,10 @@ class ImpalaBeeswaxResult(object):
'Took: %s(s)\n'
'Data:\n%s\n'
% (self.summary, self.success, self.time_taken,
- self.__format_data())
- )
+ self.__format_data()))
return message
+
# Interface to beeswax. Responsible for executing queries, fetching results.
class ImpalaBeeswaxClient(object):
# Regex applied to all tokens of a query to detect the query type.
@@ -243,121 +241,24 @@ class ImpalaBeeswaxClient(object):
if summary is None or summary.nodes is None:
return None
# If exec summary is not implemented in Impala, this function returns,
so we do not
- # get the function __build_summary_table which requires TExecStats to be
imported.
+ # get the function build_exec_summary_table which requires TExecStats to
be
+ # imported.
- output = []
- self.__build_summary_table(summary, 0, False, 0, False, output)
+ output = list()
+ self.__build_summary_table(summary, output)
return output
- def __build_summary_table(self, summary, idx, is_fragment_root, indent_level,
- new_indent_level, output):
- """NOTE: This was taken from impala_shell.py. Changes made here must be
made there as
- well. TODO: This method will be a placed in a library that is shared
between
- impala_shell and this file. (IMPALA-5792)
-
- Direct translation of Coordinator::PrintExecSummary() to recursively build
a list
- of rows of summary statistics, one per exec node
-
- summary: the TExecSummary object that contains all the summary data
-
- idx: the index of the node to print
-
- is_fragment_root: true if the node to print is the root of a fragment (and
therefore
- feeds into an exchange)
-
- indent_level: the number of spaces to print before writing the node's
label, to give
- the appearance of a tree. The 0th child of a node has the same
indent_level as its
- parent. All other children have an indent_level of one greater than their
parent.
-
- new_indent_level: If true, this indent level is different from the
previous row's.
-
- output: the list of rows into which to append the rows produced for this
node and its
- children.
-
- Returns the index of the next exec node in summary.exec_nodes that should
be
- processed, used internally to this method only.
- """
- attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
-
- # Initialise aggregate and maximum stats
- agg_stats, max_stats = TExecStats(), TExecStats()
- for attr in attrs:
- setattr(agg_stats, attr, 0)
- setattr(max_stats, attr, 0)
-
- row = {}
- node = summary.nodes[idx]
- # exec_stats may not be set even if the query is FINISHED if there are
fragments that
- # are still executing or that were cancelled before sending a status
report.
- if node.exec_stats is not None:
- for stats in node.exec_stats:
- for attr in attrs:
- val = getattr(stats, attr)
- if val is not None:
- setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
- setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
- if len(node.exec_stats) > 0:
- avg_time = agg_stats.latency_ns // len(node.exec_stats)
- else:
- avg_time = 0
-
- row["num_instances"] = len(node.exec_stats)
- row["num_hosts"] = node.num_hosts
- row["avg_time"] = avg_time
-
- is_sink = node.node_id == -1
- # If the node is a broadcast-receiving exchange node, the cardinality of
rows produced
- # is the max over all instances (which should all have received the same
number of
- # rows). Otherwise, the cardinality is the sum over all instances which
process
- # disjoint partitions.
- if is_sink:
- cardinality = -1
- elif node.is_broadcast:
- cardinality = max_stats.cardinality
- else:
- cardinality = agg_stats.cardinality
-
- est_stats = node.estimated_stats
-
- label_prefix = ""
- if indent_level > 0:
- label_prefix = "|"
- label_prefix += " |" * (indent_level - 1)
- if new_indent_level:
- label_prefix += "--"
- else:
- label_prefix += " "
-
- row["prefix"] = label_prefix
- row["operator"] = node.label
- row["max_time"] = max_stats.latency_ns
- row["num_rows"] = cardinality
- row["est_num_rows"] = est_stats.cardinality
- row["peak_mem"] = max_stats.memory_used
- row["est_peak_mem"] = est_stats.memory_used
- row["detail"] = node.label_detail
- output.append(row)
-
- if summary.exch_to_sender_map is not None and idx in
summary.exch_to_sender_map:
- sender_idx = summary.exch_to_sender_map[idx]
- # This is an exchange node, so the sender is a fragment root, and should
be printed
- # next.
- self.__build_summary_table(summary, sender_idx, True, indent_level,
False, output)
-
- idx += 1
- if node.num_children > 0:
- first_child_output = []
- idx = \
- self.__build_summary_table(
- summary, idx, False, indent_level, False, first_child_output)
- for child_idx in range(1, node.num_children):
- # All other children are indented (we only have 0, 1 or 2 children for
every exec
- # node at the moment)
- idx = self.__build_summary_table(
- summary, idx, False, indent_level + 1, True, output)
- output += first_child_output
- return idx
+ def __build_summary_table(self, summary, output):
+ from shell.impala_client import build_exec_summary_table
+ result = list()
+ build_exec_summary_table(summary, 0, 0, False, result,
is_prettyprint=False,
+ separate_prefix_column=True)
+ keys = ['prefix', 'operator', 'num_hosts', 'num_instances', 'avg_time',
'max_time',
+ 'num_rows', 'est_num_rows', 'peak_mem', 'est_peak_mem', 'detail']
+ for row in result:
+ assert len(keys) == len(row)
+ summ_map = dict(zip(keys, row))
+ output.append(summ_map)
def get_runtime_profile(self, handle):
return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle))
@@ -468,7 +369,7 @@ class ImpalaBeeswaxClient(object):
def get_log(self, query_handle):
return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
- def fetch_results(self, query_string, query_handle, max_rows = -1):
+ def fetch_results(self, query_string, query_handle, max_rows=-1):
"""Fetches query results given a handle and query type (insert, use,
other)"""
query_type = self.__get_query_type(query_string)
if query_type == 'use':
@@ -486,7 +387,7 @@ class ImpalaBeeswaxClient(object):
exec_result.query = query_string
return exec_result
- def __fetch_results(self, handle, max_rows = -1):
+ def __fetch_results(self, handle, max_rows=-1):
"""Handles query results, returns a ImpalaBeeswaxResult object"""
schema = self.__do_rpc(lambda:
self.imp_service.get_results_metadata(handle)).schema
# The query has finished, we can fetch the results