This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f18cfaf0db4c5141d6fc05f7b3feb687ea7eaf8b
Author: Riza Suminto <[email protected]>
AuthorDate: Fri May 2 22:19:56 2025 -0700

    IMPALA-14028: Refactor cancel_query_and_validate_state with HS2
    
    cancel_query_and_validate_state is a helper method used to test query
    cancellation with concurrent fetch. It is still use beeswax client by
    default.
    
    This patch change the test method to use HS2 protocol by default. The
    changes include following:
    1. Set TGetOperationStatusResp.operationState to
       TOperationState::ERROR_STATE if returning abnormally.
    2. Use separate MinimalHS2Client for
       (execute_async, fetch, get_runtime_profile) vs cancel vs close.
       Cancellation through KILL QUERY still instantiate new
       ImpylaHS2Connection client.
    3. Implement required missing methods in MinimalHS2Client.
    4. Change MinimalHS2Client logging pattern to match with other clients.
    
    Testing:
    Pass test_cancellation.py and TestResultSpoolingCancellation in core
    exploration mode. Also fix default_test_protocol to HS2 for these tests.
    
    Change-Id: I626a1a06eb3d5dc9737c7d4289720e1f52d2a984
    Reviewed-on: http://gerrit.cloudera.org:8080/22853
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Riza Suminto <[email protected]>
---
 be/src/service/impala-hs2-server.cc      |  28 +++--
 tests/common/impala_connection.py        |  75 +++++++++----
 tests/query_test/test_cancellation.py    |  34 +++++-
 tests/query_test/test_result_spooling.py |  13 ++-
 tests/util/cancel_util.py                | 175 ++++++++++++++++++++++---------
 5 files changed, 235 insertions(+), 90 deletions(-)

diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 33366195a..ba35ca395 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -890,25 +890,33 @@ void 
ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
   // Secret is inherited from session.
   TUniqueId query_id;
   TUniqueId op_secret;
-  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &op_secret),
-      SQLSTATE_GENERAL_ERROR);
+  Status status = THandleIdentifierToTUniqueId(
+      request.operationHandle.operationId, &query_id, &op_secret);
+  if (!status.ok()) {
+    return_val.__set_operationState(TOperationState::ERROR_STATE);
+    HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+  }
   VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
 
-  // Make query id available to the following HS2_RETURN_IF_ERROR().
+  // Make query id available to the following HS2_RETURN_ERROR().
   ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
 
   QueryHandle query_handle;
-  HS2_RETURN_IF_ERROR(
-      return_val, GetActiveQueryHandle(query_id, &query_handle), 
SQLSTATE_GENERAL_ERROR);
+  status = GetActiveQueryHandle(query_id, &query_handle);
+  if (!status.ok()) {
+    return_val.__set_operationState(TOperationState::ERROR_STATE);
+    HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+  }
 
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = query_handle->session_id();
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val,
-      session_handle.WithSession(
-          session_id, SecretArg::Operation(op_secret, query_id), &session),
-      SQLSTATE_GENERAL_ERROR);
+  status = session_handle.WithSession(
+      session_id, SecretArg::Operation(op_secret, query_id), &session);
+  if (!status.ok()) {
+    return_val.__set_operationState(TOperationState::ERROR_STATE);
+    HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+  }
 
   // When using long polling, this waits up to long_polling_time_ms 
milliseconds for
   // query completion.polling
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 09568bcad..3475df911 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -563,10 +563,6 @@ class BeeswaxConnection(ImpalaConnection):
     query_id = operation_handle.get_handle().id
     return query_id if query_id else str(operation_handle)
 
-  def log_handle(self, operation_handle, message):
-    handle_id = self.handle_id(operation_handle)
-    LOG.info(u"{0}: {1}".format(handle_id, message))
-
   def get_query_id(self, operation_handle):
     return operation_handle.get_handle().id
 
@@ -579,7 +575,7 @@ class ImpylaHS2Connection(ImpalaConnection):
   """
 
   # ClientRequestState::TOperationState()
-  __OPERATION_STATE_TO_EXEC_STATE = {
+  OPERATION_STATE_TO_EXEC_STATE = {
     'INITIALIZED_STATE': INITIALIZED,
     'PENDING_STATE': PENDING,
     'RUNNING_STATE': RUNNING,
@@ -825,7 +821,7 @@ class ImpylaHS2Connection(ImpalaConnection):
 
   def get_impala_exec_state(self, operation_handle):
     try:
-      return 
self.__OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
+      return 
self.OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
     except impyla_error.Error:
       return ERROR
     except Exception as e:
@@ -1051,26 +1047,39 @@ class MinimalHS2Connection(ImpalaConnection):
     self.__conn = hs2.connect(host, port, auth_mechanism='NOSASL')
     self.__user = user if user is not None else getpass.getuser()
     self.__session = self.__conn.open_session(self.__user)
+    self.__query_options = dict()
 
   def connect(self):
     pass  # Do nothing
 
   def close(self):
-    LOG.info("-- closing connection to: %s" % self.__host_port)
+    self.log_client("closing connection to: %s" % self.__host_port)
     try:
       self.__session.close()
     finally:
       self.__conn.close()
 
+  def __log_execute(self, sql_stmt):
+    session_id = session_handle_to_session_id(self.__session.handle)
+    self.log_client(
+      u"executing at {0}. session: {1} user: {2}\n{3}".format(
+        self.__host_port, session_id, self.__user, 
format_sql_for_logging(sql_stmt))
+    )
+
+  def log_client(self, message):
+    """Log 'message' at INFO level, prefixed wih the protocol name of this 
connection."""
+    LOG.info(u"minimal_{0}: {1}".format(self.get_test_protocol(), message))
+
   def execute(self, sql_stmt, user=None, fetch_profile_after_close=False,  # 
noqa: U100
               fetch_exec_summary=False,  # noqa: U100
               profile_format=TRuntimeProfileFormat.STRING):  # noqa: U100
     raise NotImplementedError()
 
   def execute_async(self, sql_stmt):
-    hs2_operation = self.__session.execute(sql_stmt)
+    self.__log_execute(sql_stmt)
+    hs2_operation = self.__session.execute(sql_stmt, 
configuration=self.__query_options)
     operation_handle = MinimalHS2OperationHandle(hs2_operation.handle, 
sql_stmt)
-    LOG.info("Started query {0}".format(operation_handle))
+    self.log_handle(operation_handle, "query started")
     return operation_handle
 
   def __get_operation(self, operation_handle):
@@ -1081,7 +1090,7 @@ class MinimalHS2Connection(ImpalaConnection):
     Fetch the results of the query. It will block the current connection if 
the results
     are not available yet.
     """
-    LOG.info("-- fetching results from: {0}".format(operation_handle))
+    self.log_handle(operation_handle, "fetching results")
     return self.__get_operation(operation_handle).fetch(max_rows=max_rows)
 
   def fetch_error(self, operation_handle):
@@ -1111,11 +1120,11 @@ class MinimalHS2Connection(ImpalaConnection):
       time.sleep(0.1)
 
   def cancel(self, operation_handle):
-    LOG.info("-- canceling operation: {0}".format(operation_handle))
+    self.log_handle(operation_handle, "canceling operation")
     return self.__get_operation(operation_handle).cancel()
 
   def close_query(self, operation_handle):
-    LOG.info("-- closing query for operation handle: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, "closing query for operation")
     return self.__get_operation(operation_handle).close()
 
   def state_is_finished(self, operation_handle):  # noqa: U100
@@ -1124,11 +1133,22 @@ class MinimalHS2Connection(ImpalaConnection):
   def get_log(self, operation_handle):
     return self.__get_operation(operation_handle).get_log()
 
-  def set_configuration_option(self, name, value):  # noqa: U100
-    raise NotImplementedError()
+  def set_configuration_option(self, name, value, is_log_sql=True):
+    # Only set the option if it's not already set to the same value.
+    # value must be parsed to string.
+    name = name.lower()
+    value = str(value)
+    if self.__query_options.get(name, "") != value:
+      self.__query_options[name] = value
+      if is_log_sql:
+        self.log_client("\n\nset {0}={1};\n".format(name, value))
+      return True
+    return False
 
   def clear_configuration(self):
-    raise NotImplementedError()
+    self.__query_options.clear()
+    if hasattr(tests.common, "current_node"):
+      self.set_configuration_option("client_identifier", 
tests.common.current_node)
 
   def get_host_port(self):
     return self.__host_port
@@ -1136,21 +1156,36 @@ class MinimalHS2Connection(ImpalaConnection):
   def get_test_protocol(self):
     return HS2
 
-  def handle_id(self, operation_handle):  # noqa: U100
+  def handle_id(self, operation_handle):
     return str(operation_handle)
 
   def get_admission_result(self, operation_handle):  # noqa: U100
     raise NotImplementedError()
 
-  def get_impala_exec_state(self, operation_handle):  # noqa: U100
-    raise NotImplementedError()
+  def get_impala_exec_state(self, operation_handle):
+    try:
+      return ImpylaHS2Connection.OPERATION_STATE_TO_EXEC_STATE[
+        self.get_state(operation_handle)]
+    except impyla_error.Error:
+      return ERROR
+    except Exception as e:
+      raise e
 
   def get_runtime_profile(self, operation_handle,
                           profile_format=TRuntimeProfileFormat.STRING):
     return self.__get_operation(operation_handle).get_profile(profile_format)
 
-  def wait_for_admission_control(self, operation_handle, timeout_s=60):  # 
noqa: U100
-    raise NotImplementedError()
+  def wait_for_admission_control(self, operation_handle, timeout_s=60):
+    self.log_handle(operation_handle, 'waiting for completion of the admission 
control')
+    start_time = time.time()
+    while time.time() - start_time < timeout_s:
+      start_rpc_time = time.time()
+      if self.is_admitted(operation_handle):
+        return True
+      rpc_time = time.time() - start_rpc_time
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
+    return False
 
   def get_exec_summary(self, operation_handle):  # noqa: U100
     raise NotImplementedError()
diff --git a/tests/query_test/test_cancellation.py 
b/tests/query_test/test_cancellation.py
index efda79e8b..ac2a9f9d4 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -29,7 +29,7 @@ from impala_thrift_gen.RuntimeProfile.ttypes import 
TRuntimeProfileFormat
 from tests.common.impala_connection import MinimalHS2Connection
 from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, 
ImpalaTestSuite
 from tests.common.test_dimensions import add_mandatory_exec_option
-from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_vector import HS2, ImpalaTestDimension
 from tests.util.cancel_util import cancel_query_and_validate_state
 from tests.verifiers.metric_verifier import MetricVerifier
 
@@ -88,6 +88,11 @@ USE_KILL_QUERY_STATEMENT = [False, True]
 
 
 class TestCancellation(ImpalaTestSuite):
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -179,7 +184,8 @@ class TestCancellation(ImpalaTestSuite):
 
     # Execute the query multiple times, cancelling it each time.
     for i in range(vector.get_value('num_cancellation_iterations')):
-      cancel_query_and_validate_state(self.client, query,
+      cancel_query_and_validate_state(
+          query,
           vector.get_value('exec_option'), vector.get_value('table_format'),
           vector.get_value('cancel_delay'), 
vector.get_value('join_before_close'),
           
use_kill_query_statement=vector.get_value('use_kill_query_statement'))
@@ -246,7 +252,6 @@ class TestCancellation(ImpalaTestSuite):
       assert "Invalid or unknown query handle" in str(
           cancel_client.fetch_error(query_handle))
 
-
   def teardown_method(self, method):
     # For some reason it takes a little while for the query to get completely 
torn down
     # when the debug action is WAIT, causing 
TestValidateMetrics.test_metrics_are_zero to
@@ -257,6 +262,11 @@ class TestCancellation(ImpalaTestSuite):
 
 
 class TestCancellationParallel(TestCancellation):
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestCancellationParallel, cls).add_test_dimensions()
@@ -267,6 +277,11 @@ class TestCancellationParallel(TestCancellation):
 
 
 class TestCancellationSerial(TestCancellation):
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestCancellationSerial, cls).add_test_dimensions()
@@ -299,6 +314,11 @@ class TestCancellationSerial(TestCancellation):
 
 
 class TestCancellationFullSort(TestCancellation):
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestCancellationFullSort, cls).add_test_dimensions()
@@ -322,6 +342,11 @@ class TestCancellationFullSort(TestCancellation):
 
 
 class TestCancellationFinalizeDelayed(ImpalaTestSuite):
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -344,5 +369,6 @@ class TestCancellationFinalizeDelayed(ImpalaTestSuite):
   def test_cancellation(self, vector):
     query = "select l_returnflag from tpch_parquet.lineitem"
     cancel_delay = 0
-    cancel_query_and_validate_state(self.client, query,
+    cancel_query_and_validate_state(
+        query,
         vector.get_value('exec_option'), vector.get_value('table_format'), 
cancel_delay)
diff --git a/tests/query_test/test_result_spooling.py 
b/tests/query_test/test_result_spooling.py
index ba1c9ba77..52ffdc06e 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -26,7 +26,7 @@ from tests.common.errors import Timeout
 from tests.common.impala_connection import FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_exec_option_dimension
-from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_vector import HS2, ImpalaTestDimension
 from tests.util.cancel_util import cancel_query_and_validate_state
 from tests.util.failpoints_util import 
execute_query_expect_debug_action_failure
 
@@ -325,6 +325,10 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
   # Time to sleep between issuing query and canceling.
   _cancel_delay_in_seconds = [0, 0.01, 0.1, 1, 4]
 
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def get_workload(cls):
     return 'tpch'
@@ -345,7 +349,8 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
 
   def test_cancellation(self, vector):
     vector.get_value('exec_option')['spool_query_results'] = 'true'
-    cancel_query_and_validate_state(self.client, vector.get_value('query'),
+    cancel_query_and_validate_state(
+        vector.get_value('query'),
         vector.get_value('exec_option'), vector.get_value('table_format'),
         vector.get_value('cancel_delay'))
 
@@ -359,9 +364,7 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
       handle = self.execute_query_async(vector.get_value('query'),
           vector.get_value('exec_option'))
       sleep(vector.get_value('cancel_delay'))
-      cancel_result = self.client.cancel(handle)
-      assert cancel_result.status_code == 0,\
-          "Unexpected status code from cancel request: 
{0}".format(cancel_result)
+      self.client.cancel(handle)
     finally:
       if handle: self.client.close_query(handle)
 
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index d72d3fc8e..c98ffc013 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -18,8 +18,16 @@
 from __future__ import absolute_import, division, print_function
 import threading
 from time import sleep
-from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION, 
create_connection
-from tests.common.impala_test_suite import ImpalaTestSuite
+import traceback
+
+from impala_thrift_gen.TCLIService import TCLIService
+from tests.common.impala_connection import (
+    create_connection,
+    ERROR,
+    IMPALA_CONNECTION_EXCEPTION,
+    MinimalHS2Connection,
+)
+from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, 
ImpalaTestSuite
 from tests.common.test_result_verifier import error_msg_startswith
 
 
@@ -88,8 +96,90 @@ def assert_kill_error(client, error_msg, query_id=None, 
sql=None, user=None):
     assert error_msg_startswith(str(exc), error_msg)
 
 
-def cancel_query_and_validate_state(client, query, exec_option, table_format,
-    cancel_delay, join_before_close=False, use_kill_query_statement=False):
+class FetchingThread(threading.Thread):
+  """Thread that does rows fetching that is subject to cancellation from main 
thread.
+  execute_async() must be called before starting the thread.
+  """
+
+  def __init__(self, client, query, exec_option, table_format,
+               use_kill_query_statement=False):
+    super(FetchingThread, self).__init__(name='FetchingThread')
+    self.client = client
+    self.query = query
+    self.exec_option = exec_option
+    self.table_format = table_format
+    self.db_name = ImpalaTestSuite.get_db_name_from_format(table_format)
+    self.use_kill_query_statement = use_kill_query_statement
+    self.fetch_results_error = None
+    self.query_profile = None
+    self.handle = None
+    self.is_error = False
+
+  def execute_async(self):
+    # change database.
+    use_handle = self.client.execute_async('use ' + self.db_name)
+    self.client.wait_for(use_handle)
+    self.client.close_query(use_handle)
+    self.client.set_configuration(self.exec_option)
+    # execute the query.
+    self.handle = self.client.execute_async(self.query)
+    self.is_error = self.client.get_impala_exec_state(self.handle) == ERROR
+
+  def run(self):
+    if self.is_error:
+      self.client.log_client(
+          'Not starting fetch_results because query failed to start')
+      return
+    self.client.log_client('Start fetching results')
+    try:
+      result = True
+      while result:
+        result = self.client.fetch(self.query, self.handle)
+    except IMPALA_CONNECTION_EXCEPTION as e:
+      self.fetch_results_error = e
+    except Exception as e:
+      stack_trace_string = traceback.format_exc()
+      msg = "Exception in fetch_results: {}\n{}".format(
+          str(e), stack_trace_string)
+      self.fetch_results_error = Exception(msg)
+    finally:
+      self.client.log_client('Stop fetching results')
+
+  def cancel_query(self):
+    assert self.handle is not None, \
+      "handle is None. Did the query fail to start?"
+    if self.use_kill_query_statement:
+      # Run kill query using ImpylaHS2Connection.
+      with create_connection(
+        host_port=self.client.get_host_port(),
+        protocol=self.client.get_test_protocol(),
+      ) as kill_client:
+        kill_client.connect()
+        if self.exec_option:
+          kill_client.set_configuration(self.exec_option)
+        assert_kill_ok(kill_client, self.client.handle_id(self.handle))
+    else:
+      # Run cancellation using separate client/connection.
+      with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as cancel_client:
+        cancel_resp = cancel_client.cancel(self.handle)
+        assert cancel_resp.status.statusCode == 
TCLIService.TStatusCode.SUCCESS_STATUS
+
+  def close_query(self):
+    with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as close_client:
+      close_client.close_query(self.handle)
+
+  def get_runtime_profile(self):
+    """Get query profile.
+    Might hit exception if QueryState has been released."""
+    try:
+      return self.client.get_runtime_profile(self.handle)
+    except Exception:
+      return None
+
+
+def cancel_query_and_validate_state(
+    query, exec_option, table_format, cancel_delay, join_before_close=False,
+    use_kill_query_statement=False):
   """Runs the given query asynchronously and then cancels it after the 
specified delay.
   The query is run with the given 'exec_options' against the specified 
'table_format'. A
   separate async thread is launched to fetch the results of the query. The 
method
@@ -102,48 +192,34 @@ def cancel_query_and_validate_state(client, query, 
exec_option, table_format,
   RPCs directly.
   """
   assert table_format is not None
-  with ImpalaTestSuite.change_database(client, table_format):
-    __run_cancel_query_and_validate_state(
-      client, query, exec_option, cancel_delay, join_before_close,
-      use_kill_query_statement)
-
-
-def __run_cancel_query_and_validate_state(client, query, exec_option,
-    cancel_delay, join_before_close=False, use_kill_query_statement=False):
   assert not (join_before_close and use_kill_query_statement)
+  with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as fetch_client:
+    thread = FetchingThread(fetch_client, query, exec_option, table_format,
+        use_kill_query_statement=use_kill_query_statement)
+    __run_cancel_query_and_validate_state(
+      thread, cancel_delay, join_before_close, use_kill_query_statement)
 
-  if exec_option: client.set_configuration(exec_option)
-  handle = client.execute_async(query)
 
-  thread = threading.Thread(target=__fetch_results, args=(query, handle))
+def __run_cancel_query_and_validate_state(
+    thread, cancel_delay, join_before_close=False, 
use_kill_query_statement=False):
+  thread.execute_async()
   thread.start()
 
   sleep(cancel_delay)
-  if client.is_error(handle):
+  if thread.is_error:
       # If some error occurred before trying to cancel the query then we put 
an error
       # message together and fail the test.
       thread.join()
-      error_msg = "The following query returned an error: %s\n" % query
+      error_msg = "The following query returned an error: 
{}\n".format(thread.query)
       if thread.fetch_results_error is not None:
           error_msg += str(thread.fetch_results_error) + "\n"
-      profile_lines = client.get_runtime_profile(handle).splitlines()
+      profile_lines = thread.get_runtime_profile().splitlines()
+      thread.close_query()
       for line in profile_lines:
           if "Query Status:" in line:
               error_msg += line
       assert False, error_msg
-  if use_kill_query_statement:
-    with create_connection(
-        host_port=client.get_host_port(),
-        protocol=client.get_test_protocol(),
-    ) as kill_client:
-      kill_client.connect()
-      if exec_option:
-        kill_client.set_configuration(exec_option)
-      assert_kill_ok(kill_client, client.handle_id(handle))
-  else:
-    cancel_result = client.cancel(handle)
-    assert cancel_result.status_code == 0, \
-        'Unexpected status code from cancel request: %s' % cancel_result
+  thread.cancel_query()
 
   if join_before_close:
     thread.join()
@@ -152,7 +228,7 @@ def __run_cancel_query_and_validate_state(client, query, 
exec_option,
   # The KILL QUERY statement will also close the query.
   if not use_kill_query_statement:
     try:
-      client.close_query(handle)
+      thread.close_query()
     except IMPALA_CONNECTION_EXCEPTION as e:
       close_error = e
 
@@ -166,13 +242,17 @@ def __run_cancel_query_and_validate_state(client, query, 
exec_option,
   # need to do this after both close_query() and fetch() have returned to 
ensure
   # that the synchronous phase of query unregistration has finished and the 
profile
   # is final.
-  profile = client.get_runtime_profile(handle)
-  if ("- Completed admission: " in profile
-      and ("- First row fetched:" in profile or "- Request finished:" in 
profile)):
+  profile = thread.get_runtime_profile()
+  if profile and (
+          "- Completed admission: " in profile
+          and ("- First row fetched:" in profile or "- Request finished:" in 
profile)):
     # TotalBytesRead is a sentinel that will only be created if 
ComputeQuerySummary()
     # has been run by the cancelling thread.
     assert "- TotalBytesRead:" in profile, profile
 
+  str_close_error = str(close_error) if close_error else ''
+  str_fetch_error = (str(thread.fetch_results_error) if 
thread.fetch_results_error
+                     else '')
   if thread.fetch_results_error is None:
     # If the fetch rpc didn't result in CANCELLED (and auto-close the query) 
then
     # the close rpc should have succeeded.
@@ -182,24 +262,17 @@ def __run_cancel_query_and_validate_state(client, query, 
exec_option,
     # failed with 'Cancelled' or failed with 'Invalid or unknown query handle'
     # (if the close rpc occured before the fetch rpc).
     if thread.fetch_results_error is not None:
-      assert 'Cancelled' in str(thread.fetch_results_error) or \
-        ('Invalid or unknown query handle' in str(thread.fetch_results_error)
-         and not join_before_close), str(thread.fetch_results_error)
+      assert 'Cancelled' in str_fetch_error or \
+        ('Invalid or unknown query handle' in str_fetch_error
+         and not join_before_close), str_fetch_error
   else:
     # If the close rpc encountered an exception, then it must be due to fetch
-    # noticing the cancellation and doing the auto-close.
-    assert 'Invalid or unknown query handle' in str(close_error)
-    assert 'Cancelled' in str(thread.fetch_results_error)
+    # noticing the cancellation and doing the auto-close, or cancellation went 
through
+    # before fetch.
+    assert ('Cancelled' in str_close_error
+            or 'Invalid or unknown query handle' in str_close_error)
+    assert ('Cancelled' in str_fetch_error
+            or 'Invalid or unknown query handle' in str_fetch_error)
 
   # TODO: Add some additional verification to check to make sure the query was
   # actually canceled
-
-
-def __fetch_results(query, handle):
-  threading.current_thread().fetch_results_error = None
-  threading.current_thread().query_profile = None
-  try:
-    new_client = ImpalaTestSuite.create_impala_client()
-    new_client.fetch(query, handle)
-  except IMPALA_CONNECTION_EXCEPTION as e:
-    threading.current_thread().fetch_results_error = e

Reply via email to