This is an automated email from the ASF dual-hosted git repository. asherman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f2f6b4b5804df036a5a7dc8ff23f8a0537b5bf97 Author: jasonmfehr <[email protected]> AuthorDate: Wed Dec 14 00:37:12 2022 +0000 IMPALA-11375 Impala shell outputs details of each RPC When the Impala shell is using the hs2 protocol, it makes multiple RPCs to the Impala daemon. These calls pass Thrift objects back and forth. This change adds the '--show_rpc' which outputs the details of the RPCs to stdout and the '--rpc_file' flag which outputs the RPC details to the specified file path. RPC details include: - operation name - request attempt count - Impala session/query ids (if applicable) - call duration - call status (success/failure) - request Thrift objects - response Thrift objects Certain information is not included in the RPC details: - Thrift object attributes named 'secret' or 'password' are redacted. - Thrift objects with a type of TRowSet or TGetRuntimeProfileResp are not include as the information contained within them is already available in the standard output from the Impala shell. Testing: - Added new tests in the end-to-end test suite. Change-Id: I36f8dbc96726aa2a573133acbe8a558299381f8b Reviewed-on: http://gerrit.cloudera.org:8080/19388 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/impala-shell.sh | 11 +- shell/impala_client.py | 433 +++++++++++++++++++++------------ shell/impala_shell.py | 14 +- shell/impala_shell_config_defaults.py | 2 + shell/make_shell_tarball.sh | 1 + shell/option_parser.py | 8 + shell/packaging/make_python_package.sh | 1 + shell/thrift_printer.py | 153 ++++++++++++ tests/shell/test_shell_commandline.py | 103 ++++++++ 9 files changed, 572 insertions(+), 154 deletions(-) diff --git a/bin/impala-shell.sh b/bin/impala-shell.sh index d70709df1..a495d9ca7 100755 --- a/bin/impala-shell.sh +++ b/bin/impala-shell.sh @@ -50,6 +50,15 @@ done # Note that this uses the external system python executable PYTHONPATH=${PYTHONPATH} python "${IMPALA_PY_DIR}/bootstrap_virtualenv.py" +# Enable remote debugging if port was specified via environment variable +if [[ ${IMPALA_SHELL_DEBUG_PORT:-0} -ne 0 ]]; then + echo "installing debugpy if needed" + ${IMPALA_PY_ENV_DIR}/bin/pip install debugpy + echo "impala python shell waiting for remote debugging connection on port" \ + "${IMPALA_SHELL_DEBUG_PORT}" + EXTRA_ARGS=" -m debugpy --listen ${IMPALA_SHELL_DEBUG_PORT} --wait-for-client" +fi + # This uses the python executable in the impala python env PYTHONIOENCODING='utf-8' PYTHONPATH=${PYTHONPATH} \ - exec "${IMPALA_PYTHON_EXECUTABLE}" ${SHELL_HOME}/impala_shell.py "$@" + exec "${IMPALA_PYTHON_EXECUTABLE}" ${EXTRA_ARGS:-} ${SHELL_HOME}/impala_shell.py "$@" diff --git a/shell/impala_client.py b/shell/impala_client.py index 36fa4a6b8..e2a058a47 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -29,6 +29,7 @@ import socket import ssl import sys import time +from datetime import datetime from beeswaxd import BeeswaxService from beeswaxd.BeeswaxService import QueryState @@ -52,6 +53,7 @@ from shell_exceptions import (RPCException, QueryStateException, DisconnectedExc QueryCancelledByShellException, MissingThriftMethodException, HttpError) from value_converter import HS2ValueConverter +from thrift_printer import ThriftPrettyPrinter # Getters to extract HS2's representation of values to the display version. # An entry must be added to this map for each supported type. HS2's TColumn has many @@ -131,7 +133,7 @@ class ImpalaClient(object): ldap_password=None, use_ldap=False, client_connect_timeout_ms=60000, verbose=True, use_http_base_transport=False, http_path=None, http_cookie_names=None, http_socket_timeout_s=None, value_converter=None, - connect_max_tries=4): + connect_max_tries=4, rpc_stdout=False, rpc_file=None): self.connected = False self.impalad_host = impalad[0] self.impalad_port = int(impalad[1]) @@ -162,6 +164,8 @@ class ImpalaClient(object): # we parse the retried query id. self.webserver_address = None self.value_converter = value_converter + self.rpc_stdout = rpc_stdout + self.rpc_file = rpc_file def connect(self): """Creates a connection to an Impalad instance. Returns a tuple with the impala @@ -639,7 +643,6 @@ class ImpalaClient(object): if not self.connected: raise DisconnectedException("Not connected (use CONNECT to establish a connection)") - class ImpalaHS2Client(ImpalaClient): """Impala client. Uses the HS2 protocol plus Impala-specific extensions.""" def __init__(self, *args, **kwargs): @@ -647,6 +650,7 @@ class ImpalaHS2Client(ImpalaClient): self.FINISHED_STATE = TOperationState._NAMES_TO_VALUES["FINISHED_STATE"] self.ERROR_STATE = TOperationState._NAMES_TO_VALUES["ERROR_STATE"] self.CANCELED_STATE = TOperationState._NAMES_TO_VALUES["CANCELED_STATE"] + self._clear_current_query_handle() # If connected, this is the handle returned by the OpenSession RPC that needs # to be passed into most HS2 RPCs. @@ -665,6 +669,9 @@ class ImpalaHS2Client(ImpalaClient): if self.value_converter is None: self.value_converter = HS2ValueConverter() + if self.rpc_stdout or self.rpc_stdout is not None: + self.thrift_printer = ThriftPrettyPrinter() + def _get_thrift_client(self, protocol): return ImpalaHiveServer2Service.Client(protocol) @@ -674,15 +681,14 @@ class ImpalaHS2Client(ImpalaClient): return self.min_sleep_interval * (num_tries - 1) def _open_session(self): - open_session_req = TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, - username=self.user) - - def OpenSession(): - return self.imp_service.OpenSession(open_session_req) + def OpenSession(req): + return self.imp_service.OpenSession(req) # OpenSession rpcs are idempotent and so ok to retry. If the client gets disconnected # and the server successfully opened a session, the client will retry and rely on # server to clean up the session. - resp = self._do_hs2_rpc(OpenSession, retry_on_error=True) + req = TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, + username=self.user) + resp = self._do_hs2_rpc(OpenSession, req, retry_on_error=True) self._check_hs2_rpc_status(resp.status) assert (resp.serverProtocolVersion == TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), resp.serverProtocolVersion @@ -697,13 +703,12 @@ class ImpalaHS2Client(ImpalaClient): # doing so. We still need to close the transport and we can rely on the # server to clean up the session. try: - req = TCloseSessionReq(self.session_handle) - - def CloseSession(): + def CloseSession(req): return self.imp_service.CloseSession(req) # CloseSession rpcs don't need retries since we catch all exceptions and close # transport. - resp = self._do_hs2_rpc(CloseSession) + req = TCloseSessionReq(self.session_handle) + resp = self._do_hs2_rpc(CloseSession, req) self._check_hs2_rpc_status(resp.status) except Exception as e: print("Warning: close session RPC failed: {0}, {1}".format(str(e), type(e))) @@ -754,12 +759,11 @@ class ImpalaHS2Client(ImpalaClient): num_tries += 1 def _ping_impala_service(self): - req = TPingImpalaHS2ServiceReq(self.session_handle) - - def PingImpalaHS2Service(): + def PingImpalaHS2Service(req): return self.imp_service.PingImpalaHS2Service(req) # PingImpalaHS2Service rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(PingImpalaHS2Service, retry_on_error=True) + req = TPingImpalaHS2ServiceReq(self.session_handle) + resp = self._do_hs2_rpc(PingImpalaHS2Service, req, retry_on_error=True) self._check_hs2_rpc_status(resp.status) self.webserver_address = resp.webserver_address return (resp.version, resp.webserver_address) @@ -781,42 +785,66 @@ class ImpalaHS2Client(ImpalaClient): """Execute the query 'query_str' asynchronously on the server with options dictionary 'set_query_options' and return a query handle that can be used for subsequent ImpalaClient method calls for the query.""" - query = self._create_query_req(query_str, set_query_options) + self._clear_current_query_handle() self.is_query_cancelled = False - def ExecuteStatement(): - return self.imp_service.ExecuteStatement(query) + def ExecuteStatement(req): + return self.imp_service.ExecuteStatement(req) # Read queries should be idempotent but most dml queries are not. Also retrying # query execution from client could be expensive and so likely makes sense to do # it if server is also aware of the retries. - resp = self._do_hs2_rpc(ExecuteStatement) + req = self._create_query_req(query_str, set_query_options) + resp = self._do_hs2_rpc(ExecuteStatement, req) if resp.status.statusCode != TStatusCode.SUCCESS_STATUS: msg = utf8_decode_if_needed(resp.status.errorMessage) raise QueryStateException("ERROR: {0}".format(msg)) handle = resp.operationHandle - if handle.hasResultSet: - req = TGetResultSetMetadataReq(handle) - def GetResultSetMetadata(): - return self.imp_service.GetResultSetMetadata(req) - # GetResultSetMetadata rpc is idempotent and should be safe to retry. - resp = self._do_hs2_rpc(GetResultSetMetadata, retry_on_error=True) - self._check_hs2_rpc_status(resp.status) - assert resp.schema is not None, resp - # Attach the schema to the handle for convenience. - handle.schema = resp.schema - handle.is_closed = False - return handle + try: + self._set_current_query_handle(handle) + if handle.hasResultSet: + def GetResultSetMetadata(req): + return self.imp_service.GetResultSetMetadata(req) + # GetResultSetMetadata rpc is idempotent and should be safe to retry. + req = TGetResultSetMetadataReq(handle) + resp = self._do_hs2_rpc(GetResultSetMetadata, req, retry_on_error=True) + self._check_hs2_rpc_status(resp.status) + assert resp.schema is not None, resp + # Attach the schema to the handle for convenience. + handle.schema = resp.schema + handle.is_closed = False + self._clear_current_query_handle() + return handle + finally: + self._clear_current_query_handle() def get_query_id_str(self, last_query_handle): + if last_query_handle is None: + return None + + guid_bytes = last_query_handle.operationId.guid + return self._convert_id_to_str(guid_bytes) + + def _set_current_query_handle(self, query_handle): + self._current_query_handle = query_handle + + def _clear_current_query_handle(self): + self._current_query_handle = None + + def get_session_id(self): + if self.session_handle is None: + return None + + return self._convert_id_to_str(self.session_handle.sessionId.guid) + + def _convert_id_to_str(self, id_bytes): # The binary representation is present in the query handle but we need to # massage it into the expected string representation. C++ and Java code # treats the low and high half as two 64-bit little-endian integers and # as a result prints the hex representation in the reverse order to how # bytes are laid out in guid. - guid_bytes = last_query_handle.operationId.guid - low_bytes_reversed = guid_bytes[7::-1] - high_bytes_reversed = guid_bytes[16:7:-1] + low_bytes_reversed = id_bytes[7::-1] + high_bytes_reversed = id_bytes[16:7:-1] if sys.version_info.major < 3: low_hex = low_bytes_reversed.encode('hex_codec') @@ -828,31 +856,35 @@ class ImpalaHS2Client(ImpalaClient): return "{low}:{high}".format(low=low_hex, high=high_hex) def fetch(self, query_handle): - assert query_handle.hasResultSet - prim_types = [column.typeDesc.types[0].primitiveEntry.type - for column in query_handle.schema.columns] - column_value_getters = [HS2_VALUE_GETTERS[prim_type] - for prim_type in prim_types] - column_value_converters = [self.value_converter.get_converter(prim_type) - for prim_type in prim_types] - while True: - req = TFetchResultsReq(query_handle, TFetchOrientation.FETCH_NEXT, - self.fetch_size) - - def FetchResults(): - return self.imp_service.FetchResults(req) - # FetchResults rpc is not idempotent unless the client and server communicate and - # results are kept around for retry to be successful. - resp = self._do_hs2_rpc(FetchResults) - self._check_hs2_rpc_status(resp.status) + try: + self._set_current_query_handle(query_handle) + assert query_handle.hasResultSet + prim_types = [column.typeDesc.types[0].primitiveEntry.type + for column in query_handle.schema.columns] + column_value_getters = [HS2_VALUE_GETTERS[prim_type] + for prim_type in prim_types] + column_value_converters = [self.value_converter.get_converter(prim_type) + for prim_type in prim_types] + while True: + def FetchResults(req): + return self.imp_service.FetchResults(req) + # FetchResults rpc is not idempotent unless the client and server communicate and + # results are kept around for retry to be successful. + req = TFetchResultsReq(query_handle, + TFetchOrientation.FETCH_NEXT, + self.fetch_size) + resp = self._do_hs2_rpc(FetchResults, req) + self._check_hs2_rpc_status(resp.status) - # Transpose the columns into a row-based format for more convenient processing - # for the display code. This is somewhat inefficient, but performance is comparable - # to the old Beeswax code. - yield self._transpose(column_value_getters, column_value_converters, - resp.results.columns) - if not self._hasMoreRows(resp, column_value_getters): - return + # Transpose the columns into a row-based format for more convenient processing + # for the display code. This is somewhat inefficient, but performance is + # comparable to the old Beeswax code. + yield self._transpose(column_value_getters, column_value_converters, + resp.results.columns) + if not self._hasMoreRows(resp, column_value_getters): + return + finally: + self._clear_current_query_handle() def _hasMoreRows(self, resp, column_value_getters): return resp.hasMoreRows @@ -889,83 +921,109 @@ class ImpalaHS2Client(ImpalaClient): return rows def close_dml(self, last_query_handle): - req = TCloseImpalaOperationReq(last_query_handle) + try: + self._set_current_query_handle(last_query_handle) - def CloseImpalaOperation(): - return self.imp_service.CloseImpalaOperation(req) - # CloseImpalaOperation rpc is not idempotent for dmls. - resp = self._do_hs2_rpc(CloseImpalaOperation) - self._check_hs2_rpc_status(resp.status) - if not resp.dml_result: - raise RPCException("Impala DML operation did not return DML statistics.") + def CloseImpalaOperation(req): + return self.imp_service.CloseImpalaOperation(req) + # CloseImpalaOperation rpc is not idempotent for dmls. + req = TCloseImpalaOperationReq(last_query_handle) + resp = self._do_hs2_rpc(CloseImpalaOperation, req) + self._check_hs2_rpc_status(resp.status) + if not resp.dml_result: + raise RPCException("Impala DML operation did not return DML statistics.") - num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()]) - last_query_handle.is_closed = True - return (num_rows, resp.dml_result.num_row_errors) + num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()]) + last_query_handle.is_closed = True + return (num_rows, resp.dml_result.num_row_errors) + finally: + self._clear_current_query_handle() def close_query(self, last_query_handle): - # Set a member in the handle to make sure that it is idempotent - if last_query_handle.is_closed: - return True - req = TCloseImpalaOperationReq(last_query_handle) - - def CloseImpalaOperation(): - return self.imp_service.CloseImpalaOperation(req) - # CloseImpalaOperation rpc is idempotent for non dml queries and so safe to retry. - resp = self._do_hs2_rpc(CloseImpalaOperation, retry_on_error=True) - last_query_handle.is_closed = True - return self._is_hs2_nonerror_status(resp.status.statusCode) + try: + self._set_current_query_handle(last_query_handle) + # Set a member in the handle to make sure that it is idempotent + if last_query_handle.is_closed: + return True + + def CloseImpalaOperation(req): + return self.imp_service.CloseImpalaOperation(req) + # CloseImpalaOperation rpc is idempotent for non dml queries and so safe to retry. + req = TCloseImpalaOperationReq(last_query_handle) + resp = self._do_hs2_rpc(CloseImpalaOperation, req, retry_on_error=True) + last_query_handle.is_closed = True + return self._is_hs2_nonerror_status(resp.status.statusCode) + finally: + self._clear_current_query_handle() def cancel_query(self, last_query_handle): # Cancel sets query_state to ERROR_STATE before calling cancel() in the # co-ordinator, so we don't need to wait. - if last_query_handle.is_closed: - return True - req = TCancelOperationReq(last_query_handle) - - def CancelOperation(): - return self.imp_service.CancelOperation(req) - # CancelOperation rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(CancelOperation, retry_on_error=True) - return self._is_hs2_nonerror_status(resp.status.statusCode) + try: + self._set_current_query_handle(last_query_handle) + if last_query_handle.is_closed: + return True + + def CancelOperation(req): + return self.imp_service.CancelOperation(req) + # CancelOperation rpc is idempotent and so safe to retry. + req = TCancelOperationReq(last_query_handle) + resp = self._do_hs2_rpc(CancelOperation, req, retry_on_error=True) + return self._is_hs2_nonerror_status(resp.status.statusCode) + finally: + self._clear_current_query_handle() def get_query_state(self, last_query_handle): - req = TGetOperationStatusReq(last_query_handle) + try: + self._set_current_query_handle(last_query_handle) - def GetOperationStatus(): - return self.imp_service.GetOperationStatus(req) - # GetOperationStatus rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(GetOperationStatus, retry_on_error=True) - self._check_hs2_rpc_status(resp.status) - return resp.operationState + def GetOperationStatus(req): + return self.imp_service.GetOperationStatus(req) + # GetOperationStatus rpc is idempotent and so safe to retry. + req = TGetOperationStatusReq(last_query_handle) + resp = self._do_hs2_rpc(GetOperationStatus, req, retry_on_error=True) + self._check_hs2_rpc_status(resp.status) + return resp.operationState + finally: + self._clear_current_query_handle() def get_runtime_profile(self, last_query_handle): - req = TGetRuntimeProfileReq(last_query_handle, self.session_handle, - include_query_attempts=True) - - def GetRuntimeProfile(): - return self.imp_service.GetRuntimeProfile(req) - # GetRuntimeProfile rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(GetRuntimeProfile, retry_on_error=True) - self._check_hs2_rpc_status(resp.status) - failed_profile = None - if resp.failed_profiles and len(resp.failed_profiles) >= 1: - failed_profile = resp.failed_profiles[0] - return resp.profile, failed_profile + try: + self._set_current_query_handle(last_query_handle) + + def GetRuntimeProfile(req): + return self.imp_service.GetRuntimeProfile(req) + # GetRuntimeProfile rpc is idempotent and so safe to retry. + profile_req = TGetRuntimeProfileReq(last_query_handle, + self.session_handle, + include_query_attempts=True) + resp = self._do_hs2_rpc(GetRuntimeProfile, profile_req, retry_on_error=True) + self._check_hs2_rpc_status(resp.status) + failed_profile = None + if resp.failed_profiles and len(resp.failed_profiles) >= 1: + failed_profile = resp.failed_profiles[0] + return resp.profile, failed_profile + finally: + self._clear_current_query_handle() def get_summary(self, last_query_handle): - req = TGetExecSummaryReq(last_query_handle, self.session_handle, - include_query_attempts=True) - - def GetExecSummary(): - return self.imp_service.GetExecSummary(req) - # GetExecSummary rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(GetExecSummary, retry_on_error=True) - self._check_hs2_rpc_status(resp.status) - failed_summary = None - if resp.failed_summaries and len(resp.failed_summaries) >= 1: - failed_summary = resp.failed_summaries[0] - return resp.summary, failed_summary + try: + self._set_current_query_handle(last_query_handle) + + def GetExecSummary(req): + return self.imp_service.GetExecSummary(req) + # GetExecSummary rpc is idempotent and so safe to retry. + req = TGetExecSummaryReq(last_query_handle, + self.session_handle, + include_query_attempts=True) + resp = self._do_hs2_rpc(GetExecSummary, req, retry_on_error=True) + self._check_hs2_rpc_status(resp.status) + failed_summary = None + if resp.failed_summaries and len(resp.failed_summaries) >= 1: + failed_summary = resp.failed_summaries[0] + return resp.summary, failed_summary + finally: + self._clear_current_query_handle() def get_column_names(self, last_query_handle): # The handle has the schema embedded in it. @@ -980,28 +1038,33 @@ class ImpalaHS2Client(ImpalaClient): """Returns all messages from the error log prepended with 'WARNINGS:' or 'ERROR:' for last_query_handle, depending on whether warn is True or False. Note that the error log may contain messages that are not errors (e.g. warnings).""" - if last_query_handle is None: - return "Query could not be executed" - req = TGetLogReq(last_query_handle) - - def GetLog(): - return self.imp_service.GetLog(req) - # GetLog rpc is idempotent and so safe to retry. - resp = self._do_hs2_rpc(GetLog, retry_on_error=True) - self._check_hs2_rpc_status(resp.status) - - log = utf8_decode_if_needed(resp.log) - - # Strip progress message out of HS2 log. - log = HS2_LOG_PROGRESS_REGEX.sub("", log) - if log and log.strip(): - log = self._append_retried_query_link(log) - type_str = "WARNINGS" if warn is True else "ERROR" - return "%s: %s" % (type_str, log) - return "" + try: + self._set_current_query_handle(last_query_handle) + if last_query_handle is None: + return "Query could not be executed" + + def GetLog(req): + return self.imp_service.GetLog(req) + # GetLog rpc is idempotent and so safe to retry. + req = TGetLogReq(last_query_handle) + resp = self._do_hs2_rpc(GetLog, req, retry_on_error=True) + self._check_hs2_rpc_status(resp.status) - def _do_hs2_rpc(self, rpc, suppress_error_on_cancel=True, retry_on_error=False): - """Executes the provided 'rpc' callable and tranlates any exceptions in the + log = utf8_decode_if_needed(resp.log) + + # Strip progress message out of HS2 log. + log = HS2_LOG_PROGRESS_REGEX.sub("", log) + if log and log.strip(): + log = self._append_retried_query_link(log) + type_str = "WARNINGS" if warn is True else "ERROR" + return "%s: %s" % (type_str, log) + return "" + finally: + self._clear_current_query_handle() + + def _do_hs2_rpc(self, rpc, rpc_input, + suppress_error_on_cancel=True, retry_on_error=False): + """Executes the provided 'rpc' callable and translates any exceptions in the appropriate exception for the shell. The input 'rpc' must be a python function with the __name__ attribute and not a lambda function. Exceptions raised include: * DisconnectedException if the client cannot communicate with the server. @@ -1018,6 +1081,7 @@ class ImpalaHS2Client(ImpalaClient): if retry_on_error: max_tries = self.max_tries while num_tries <= max_tries: + start_time = self._print_rpc_start(rpc, rpc_input, num_tries) raise_error = (num_tries == max_tries) # Generate a retry message, only if retries and supported. will_retry = False @@ -1029,7 +1093,9 @@ class ImpalaHS2Client(ImpalaClient): else: retry_msg = '' try: - return rpc() + rpc_output = rpc(rpc_input) + self._print_rpc_end(rpc, rpc_output, start_time, "SUCCESS") + return rpc_output except TTransportException as e: # Unwrap socket.error so we can handle it directly. if isinstance(e.inner, socket.error): @@ -1037,11 +1103,13 @@ class ImpalaHS2Client(ImpalaClient): # issue with the connection with the impalad print('Caught exception {0}, type={1} in {2}. {3}' .format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr) + self._print_rpc_end(rpc, None, start_time, "Error - TTransportException") if raise_error: if isinstance(e, TTransportException): raise DisconnectedException("Error communicating with impalad: %s" % e) raise e except TApplicationException as t: + self._print_rpc_end(rpc, None, start_time, "Error - TApplicationException") # Suppress the errors from cancelling a query that is in waiting_to_finish state if suppress_error_on_cancel and self.is_query_cancelled: raise QueryCancelledByShellException() @@ -1064,11 +1132,13 @@ class ImpalaHS2Client(ImpalaClient): else: print('Caught exception {0}, type={1} in {2}. {3}' .format(str(h), type(h), rpc.__name__, retry_msg), file=sys.stderr) + self._print_rpc_end(rpc, None, start_time, "Error - HttpError") if raise_error: raise except Exception as e: print('Caught exception {0}, type={1} in {2}. {3}' .format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr) + self._print_rpc_end(rpc, None, start_time, "Error") if raise_error: raise if retry_secs: @@ -1104,6 +1174,66 @@ class ImpalaHS2Client(ImpalaClient): TStatusCode.SUCCESS_WITH_INFO_STATUS, TStatusCode.STILL_EXECUTING_STATUS) + def _print_line_separator(self, fh): + """Prints out a visible separator suitable for outputting debug and + trace information to the screen or a file""" + fh.write("------------------------------------------------") + fh.write("------------------------------------------------\n") + + def _print_rpc_start(self, rpc_func, rpc_input, num_tries): + """Prints out a nicely formatted detailed breakdown of the request to + a rpc call. Handles both the 'rpc_stdout' and 'rpc_file' command line arguments.""" + if self.rpc_stdout or self.rpc_file is not None: + start_time = datetime.now() + + def print_start_to_file(fh): + self._print_line_separator(fh) + fh.write("[{0}] RPC CALL STARTED:\n".format(start_time)) + fh.write("OPERATION: {0}\nDETAILS:\n".format(rpc_func.__name__)) + fh.write(" * Impala Session Id: {0}\n".format(self.get_session_id())) + fh.write(" * Impala Query Id: {0}\n" + .format(self.get_query_id_str(self._current_query_handle))) + fh.write(" * Attempt Count: {0}\n".format(num_tries)) + fh.write("\nRPC REQUEST:\n") + self.thrift_printer.print_obj(rpc_input, fh) + self._print_line_separator(fh) + + if self.rpc_stdout: + print_start_to_file(sys.stdout) + + if self.rpc_file: + with open(self.rpc_file, "a") as f: + print_start_to_file(f) + + return start_time + + return None + + def _print_rpc_end(self, rpc_func, rpc_output, start_time, result): + """Prints out a nicely formatted detailed breakdown of the response from + a rpc call. Handles both the 'rpc_stdout' and 'rpc_file' command line arguments.""" + if self.rpc_stdout or self.rpc_file is not None: + end_time = datetime.now() + duration = end_time - start_time + + def print_end_to_file(fh): + self._print_line_separator(fh) + fh.write("[{0}] RPC CALL FINISHED:\n".format(datetime.now())) + fh.write("OPERATION: {0}\nDETAILS:\n".format(rpc_func.__name__)) + fh.write(" * Time: {0}ms\n".format(duration.microseconds / 1000)) + fh.write(" * Result: {0}\n".format(result)) + if rpc_output is not None: + fh.write("\nRPC RESPONSE:\n") + self.thrift_printer.print_obj(rpc_output, fh) + self._print_line_separator(fh) + + if self.rpc_stdout: + print_end_to_file(sys.stdout) + + if self.rpc_file: + with open(self.rpc_file, "a") as f: + print_end_to_file(f) + class RpcStatus: """Convenience enum used in ImpalaBeeswaxClient to describe Rpc return statuses""" @@ -1125,13 +1255,18 @@ class StrictHS2Client(ImpalaHS2Client): # Set a member in the handle to make sure that it is idempotent if last_query_handle.is_closed: return True - req = TCloseOperationReq(last_query_handle) - def CloseOperation(): - return self.imp_service.CloseOperation(req) - resp = self._do_hs2_rpc(CloseOperation, retry_on_error=False) - last_query_handle.is_closed = True - return self._is_hs2_nonerror_status(resp.status.statusCode) + try: + self._set_current_query_handle(last_query_handle) + + def CloseOperation(req): + return self.imp_service.CloseOperation(req) + req = TCloseOperationReq(last_query_handle) + resp = self._do_hs2_rpc(CloseOperation, req, retry_on_error=False) + last_query_handle.is_closed = True + return self._is_hs2_nonerror_status(resp.status.statusCode) + finally: + self._clear_current_query_handle() def _ping_impala_service(self): return ("N/A", "N/A") diff --git a/shell/impala_shell.py b/shell/impala_shell.py index 0ba0e62da..5101a065e 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -216,6 +216,8 @@ class ImpalaShell(cmd.Cmd, object): self.cached_prompt = str() self.show_profiles = options.show_profiles + self.rpc_stdout = options.rpc_stdout + self.rpc_file = options.rpc_file # Output formatting flags/options self.output_file = options.output_file @@ -607,7 +609,8 @@ class ImpalaShell(cmd.Cmd, object): self.ca_cert, self.user, self.ldap_password, True, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=False, http_path=self.http_path, - http_cookie_names=None, value_converter=value_converter) + http_cookie_names=None, value_converter=value_converter, + rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file) elif protocol == 'hs2-http': return StrictHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, @@ -615,14 +618,16 @@ class ImpalaShell(cmd.Cmd, object): self.client_connect_timeout_ms, self.verbose, use_http_base_transport=True, http_path=self.http_path, http_cookie_names=self.http_cookie_names, - value_converter=value_converter) + value_converter=value_converter, rpc_stdout=self.rpc_stdout, + rpc_file=self.rpc_file) if protocol == 'hs2': return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=False, http_path=self.http_path, - http_cookie_names=None, value_converter=value_converter) + http_cookie_names=None, value_converter=value_converter, + rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file) elif protocol == 'hs2-http': return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, @@ -632,7 +637,8 @@ class ImpalaShell(cmd.Cmd, object): http_cookie_names=self.http_cookie_names, http_socket_timeout_s=self.http_socket_timeout_s, value_converter=value_converter, - connect_max_tries=self.connect_max_tries) + connect_max_tries=self.connect_max_tries, + rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file) elif protocol == 'beeswax': return ImpalaBeeswaxClient(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py index c4caed252..59f6cd043 100644 --- a/shell/impala_shell_config_defaults.py +++ b/shell/impala_shell_config_defaults.py @@ -47,6 +47,8 @@ impala_shell_defaults = { 'query': None, 'query_file': None, 'show_profiles': False, + 'rpc_stdout': False, + 'rpc_file': None, 'ssl': False, 'use_kerberos': False, 'use_ldap': False, diff --git a/shell/make_shell_tarball.sh b/shell/make_shell_tarball.sh index b3ae5e16e..e50931000 100755 --- a/shell/make_shell_tarball.sh +++ b/shell/make_shell_tarball.sh @@ -156,6 +156,7 @@ cp ${SHELL_HOME}/value_converter.py ${TARBALL_ROOT}/lib cp ${SHELL_HOME}/impala-shell ${TARBALL_ROOT} cp ${SHELL_HOME}/impala_shell.py ${TARBALL_ROOT} cp ${SHELL_HOME}/compatibility.py ${TARBALL_ROOT} +cp ${SHELL_HOME}/thrift_printer.py ${TARBALL_ROOT} pushd ${BUILD_DIR} > /dev/null echo "Making tarball in ${BUILD_DIR}" diff --git a/shell/option_parser.py b/shell/option_parser.py index c43e92dc0..8d28a7404 100755 --- a/shell/option_parser.py +++ b/shell/option_parser.py @@ -206,6 +206,14 @@ def get_option_parser(defaults): parser.add_option("-p", "--show_profiles", dest="show_profiles", action="store_true", help="Always display query profiles after execution") + parser.add_option("--rpc_stdout", dest="rpc_stdout", + action="store_true", + help="Output hs2 rpc details to stdout. " + "Ignored if protocol is beeswax.") + parser.add_option("--rpc_file", dest="rpc_file", + help="Write hs2 rpc call details to the given file. " + "If the file exists, rpc call details will be appended to the " + "file. Ignored if protocol is beeswax.") parser.add_option("--quiet", dest="verbose", action="store_false", help="Disable verbose output") diff --git a/shell/packaging/make_python_package.sh b/shell/packaging/make_python_package.sh index 954f7eae5..0f1abafd3 100755 --- a/shell/packaging/make_python_package.sh +++ b/shell/packaging/make_python_package.sh @@ -61,6 +61,7 @@ assemble_package_files() { cp "${SHELL_HOME}/shell_exceptions.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/cookie_util.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/value_converter.py" "${MODULE_LIB_DIR}" + cp "${SHELL_HOME}/thrift_printer.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/packaging/README.md" "${PACKAGE_DIR}" cp "${SHELL_HOME}/packaging/MANIFEST.in" "${PACKAGE_DIR}" diff --git a/shell/thrift_printer.py b/shell/thrift_printer.py new file mode 100644 index 000000000..fcd32b1db --- /dev/null +++ b/shell/thrift_printer.py @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import re +from sys import stdout + + +class ThriftPrettyPrinter(object): + """Implements a pretty printer for Thrift objects. + Generates string representations. Does not output + to stdout, stderr, or any other file handles.""" + + # Inputs: + # redacted_fields - list of names of object attributes whose + # values will not be printed out + # indent - string containing only spaces, used as the + # base indentation where all other indentations + # will be multiples of this string + # objects_to_skip - list of names of objects attributes that + # will not be printed out, useful to ignore + # duplicate information such as query results + def __init__(self, + redacted_fields=("secret", "password"), + indent=" ", + objects_to_skip=("TRowSet", "TGetRuntimeProfileResp")): + if redacted_fields is not None: + assert type(redacted_fields) is list or \ + type(redacted_fields) is tuple, \ + "redacted_fields must be either a list or a tuple" + self.base_indent = indent + self.redacted_fields = redacted_fields + self.objects_to_skip = objects_to_skip + + self._objname_re = re.compile("^.*?'(.*?)'.*?$") + + def print_obj(self, thrift_obj, file_handle=stdout): + """Prints the provided 'thrift_obj' to the provided + file handle. If no file handle is specified, then + stdout will be used. The 'file_handle' object must + have a write(string) method. + + While this class is specifically targeted to printing + Thrift objects, there is no technical limitation preventing + any other type of object from being printed. However, the + output of non-Thrift objects may not be as nicely formatted.""" + + # Inputs: + # thrift_obj - the object to print out, its attributes will + # be walked recursively through the entire + # object structure and printed out + # file_handle - where the object will be written, defaults to stdout + # but can be any object with a write(str) method + self._internal_print(thrift_obj, self.base_indent, file_handle) + + def _internal_print(self, thrift_obj, indent, file_handle): + """Recursive function that does the work of walking and printing + an object.""" + # parse out the type name of the thrift object + obj_name = self._objname_re.match(str(type(thrift_obj))) \ + .group(1).split(".")[-1] + file_handle.write("<{0}>".format(obj_name)) + + if self.objects_to_skip.count(obj_name): + file_handle.write(" - <skipping>\n") + return + + indent = "{0}{1}".format(indent, self.base_indent) + file_handle.write("\n") + + if obj_name == "list" or obj_name == "tuple": + # lists and tuples have to be handled differently + # because the vars function does not operate on them + for attr_val in thrift_obj: + file_handle.write(indent) + self._internal_print(attr_val, indent, file_handle) + else: + # print out simple types first before printing out objects + # this ensures the simple types are easier to see + child_simple_attrs = {} + child_objs = {} + for attr_name in vars(thrift_obj): + attr_val = getattr(thrift_obj, attr_name) + if (hasattr(attr_val, '__dict__') + or attr_val is list + or attr_val is tuple): + child_objs[attr_name] = attr_val + else: + child_simple_attrs[attr_name] = attr_val + + # print out child attributes in alphabetical order + for child_attr_name in sorted(child_simple_attrs): + self._print_attr(child_attr_name, + child_simple_attrs[child_attr_name], + indent, + file_handle) + + # print out complex types objects, lists, or tuples + # in alphabetical order + for attr_name in sorted(child_objs): + self._print_attr(attr_name, + child_objs[attr_name], + indent, + file_handle) + + def _print_attr(self, attr_name, attr_val, indent, file_handle): + """Handles a single object attribute by either printing out + its name/value (for simple types) or recursing down into the + object (for objects/lists/tuples).""" + file_handle.write(indent) + + if attr_val is not None and self.redacted_fields.count(attr_name) > 0: + file_handle.write("- {0}: *******\n".format(attr_name)) + elif attr_val is None: + file_handle.write("- {0}: <None>\n".format(attr_name)) + elif type(attr_val) is list or type(attr_val) is tuple: + file_handle.write("[") + self._internal_print(attr_val, indent, file_handle) + file_handle.write("{0}]\n".format(indent)) + elif hasattr(attr_val, '__dict__'): + indent += "{0:{1}} {2}".format("", len(attr_name), self.base_indent) + file_handle.write("- {0}: ".format(attr_name)) + self._internal_print(attr_val, indent, file_handle) + else: + file_handle.write("- {0}: ".format(attr_name)) + try: + str(attr_val).decode("ascii") + file_handle.write("{0}".format(attr_val)) + except UnicodeDecodeError: + # python2 - string contains binary data + file_handle.write("<binary data>") + except AttributeError: + # python3 - does not require decoding strings and thus falls into this code + if isinstance(attr_val, bytes): + file_handle.write("<binary data>") + else: + file_handle.write("{0}".format(attr_val)) + file_handle.write("\n") diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 0c867236e..c91c31797 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -1408,3 +1408,106 @@ class TestImpalaShell(ImpalaTestSuite): result = run_impala_shell_cmd(vector, args) assert expected_py2 in result.stdout or expected_py3 in result.stdout + + def test_output_rpc_to_screen_and_file(self, vector, populated_table, tmp_file): + """Tests the flags that output hs2 rpc call details to both stdout + and a file. Asserts the expected text is written.""" + self.skip_if_protocol_is_beeswax(vector) + + args = ['--rpc_stdout', '--rpc_file', tmp_file, + '-q', 'select * from {0}'.format(populated_table)] + result = run_impala_shell_cmd(vector, args) + + stdout_data = result.stdout.strip() + rpc_file_data = open(tmp_file, "r").read().strip() + + def check_multiline(check_desc, regex_lines): + """Build and runs a multi-line regular expression against both the + shell's stdout and rpc file contents to ensure the expected data about + the hs2 rpc calls was outputted.""" + the_re = re.compile("^" + "\n".join(regex_lines) + "$", re.MULTILINE) + assert the_re.search(stdout_data), \ + "'{0}' assert failed in stdout: \n{1}" \ + .format(check_desc, stdout_data) + assert the_re.search(rpc_file_data), \ + "'{0}' assert failed in the rpc file contents: \n{1}" \ + .format(check_desc, rpc_file_data) + + check_multiline("Open Session Request", + [ + "{0}{1}".format( + "\\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{6}\\] ", + "RPC CALL STARTED:"), + "OPERATION: OpenSession", + "DETAILS:", + " \\* Impala Session Id: None", + " \\* Impala Query Id: None", + " \\* Attempt Count: 1", + "", + "RPC REQUEST:", + "\\<TOpenSessionReq\\>", + " - client_protocol: 5", + " - configuration: <None>", + " - password: <None>", + " - username: .*?", + ]) + + check_multiline("Open Session Response", + [ + "{0}{1}".format( + "\\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{6}\\] ", + "RPC CALL FINISHED:"), + "OPERATION: OpenSession", + "DETAILS:", + " \\* Time: \\d+(\\.\\d+)?ms", + " \\* Result: SUCCESS", + "", + "RPC RESPONSE:", + "\\<TOpenSessionResp\\>", + " - configuration: .*?", + " - serverProtocolVersion: .*?", + " - sessionHandle: \\<TSessionHandle\\>", + " - sessionId: \\<THandleIdentifier\\>", + " - guid: \\<binary data\\>", + " - secret: " + "\\*\\*\\*\\*\\*\\*\\*", + " - status: \\<TStatus\\>", + " - errorCode: \\<None\\>", + " - errorMessage: \\<None\\>", + " - infoMessages: \\<None\\>", + " - sqlState: \\<None\\>", + " - statusCode: 0", + ]) + + check_multiline("Session and Query Id", + [ + "{0}{1}".format( + "\\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{6}\\] ", + "RPC CALL STARTED:"), + "OPERATION: FetchResults", + "DETAILS:", + " \\* Impala Session Id: [a-z0-9]*:[a-z0-9]*", + " \\* Impala Query Id: [a-z0-9]*:[a-z0-9]*", + ]) + + check_multiline("TRowSet Skipped", + [ + "{0}{1}".format( + "\\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{6}\\] ", + "RPC CALL FINISHED:"), + "OPERATION: FetchResults", + "DETAILS:", + " \\* Time: \\d+(\\.\\d+)?ms", + " \\* Result: SUCCESS", + "", + "RPC RESPONSE:", + "<TFetchResultsResp>", + " - hasMoreRows: False", + " - results: <TRowSet> - <skipping>", + " - status: <TStatus>", + " - errorCode: <None>", + " - errorMessage: <None>", + " - infoMessages: <None>", + " - sqlState: <None>", + " - statusCode: 0", + ])
