This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5a62ded2db573e34de0c05f62b2c3caa6b797c3c
Author: Michael Smith <[email protected]>
AuthorDate: Tue Oct 17 11:59:05 2023 -0700

    IMPALA-12493: Allow retry after failed cancel_query
    
    Previously if /cancel_query failed with "Query not yet running",
    subsequent attempts to cancel the query would always fail and the query
    would never be unregistered (so it would stay in "queries in flight" or
    "waiting to be closed" until the coordinator was restarted) because the
    QueryDriver thought the query was finalized. Check whether the query is
    inflight before setting finalized_ to allow retries.
    
    Cancelling a request can no longer fail, so checks for whether the query
    is inflight are moved to the callers.
    
    Two ClientRequestState::Finalize calls that previously passed
    check_inflight=true no longer check inflight. Retried queries should
    always reach inflight before they need to be finalized. They were added
    in IMPALA-10413 and IMPALA-10414 with no explanation why check inflight
    was needed - presumably it was done because earlier in the function the
    retried query was marked inflight - and still pass their tests. Added
    DCHECKs since the query should be inflight until closed.
    
    Testing:
    - adds a test that canceling a query in CREATED state fails but does not
      prevent canceling the query later
    - adds a test that canceling a query that later fails results in that
      query completing rather than going to "waiting to be closed"
    
    Change-Id: I6e2c8872e91a7ff078cedb13e0771bcfaae6ee24
    Reviewed-on: http://gerrit.cloudera.org:8080/20584
    Tested-by: Michael Smith <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
---
 be/src/runtime/query-driver.cc                     |  30 +++---
 be/src/runtime/query-driver.h                      |  10 +-
 be/src/service/client-request-state.cc             |  20 +---
 be/src/service/client-request-state.h              |  21 ++--
 be/src/service/impala-server.cc                    |  14 ++-
 .../java/org/apache/impala/service/Frontend.java   |   6 ++
 .../java/org/apache/impala/util/DebugUtils.java    |   3 +
 tests/webserver/test_web_pages.py                  | 119 +++++++++++++++++++++
 8 files changed, 173 insertions(+), 50 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 161d0dab2..6bc677dea 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -242,15 +242,10 @@ void QueryDriver::RetryQueryFromThread(
 
   shared_ptr<ImpalaServer::SessionState> session = request_state->session();
 
-  // Cancel the query. 'check_inflight' should be false because (1) a retry 
can be
-  // triggered when the query is in the INITIALIZED state, and (2) the user 
could have
+  // Cancel the query. We don't worry about whether it's inflight because (1) 
a retry can
+  // be triggered when the query is in the INITIALIZED state, and (2) the user 
could have
   // already cancelled the query.
-  Status status = request_state->Cancel(false, nullptr);
-  if (!status.ok()) {
-    status.AddDetail(Substitute("Failed to retry query $0", 
PrintId(query_id)));
-    discard_result(request_state->UpdateQueryStatus(status));
-    return;
-  }
+  request_state->Cancel(nullptr);
 
   unique_ptr<ClientRequestState> retry_request_state = nullptr;
   CreateRetriedClientRequestState(request_state, &retry_request_state, 
&session);
@@ -275,7 +270,8 @@ void QueryDriver::RetryQueryFromThread(
   retry_query_handle.SetHandle(query_driver, retry_request_state.get());
 
   // Register the new query.
-  status = parent_server_->RegisterQuery(retry_query_id, session, 
&retry_query_handle);
+  Status status =
+      parent_server_->RegisterQuery(retry_query_id, session, 
&retry_query_handle);
   if (!status.ok()) {
     string error_msg = Substitute(
         "RegisterQuery for new query with id $0 failed", 
PrintId(retry_query_id));
@@ -330,7 +326,7 @@ void QueryDriver::RetryQueryFromThread(
       // the retried query, it actually finalizes the original 
ClientRequestState. So we
       // have to explicitly finalize 'retry_request_state', otherwise we'll 
hit some
       // illegal states in destroying it.
-      RETURN_VOID_IF_ERROR(retry_request_state->Finalize(false, nullptr));
+      retry_request_state->Finalize(nullptr);
       return;
     }
   }
@@ -344,7 +340,7 @@ void QueryDriver::RetryQueryFromThread(
     string error_msg = Substitute(
         "SetQueryInFlight for new query with id $0 failed", 
PrintId(retry_query_id));
     HandleRetryFailure(&status, &error_msg, request_state, retry_query_id);
-    RETURN_VOID_IF_ERROR(retry_request_state->Finalize(false, nullptr));
+    retry_request_state->Finalize(nullptr);
     return;
   }
 
@@ -359,7 +355,8 @@ void QueryDriver::RetryQueryFromThread(
           "Failed to retry query since the original query was unregistered");
       string error_msg = Substitute("Query was unregistered");
       HandleRetryFailure(&status, &error_msg, request_state, retry_query_id);
-      RETURN_VOID_IF_ERROR(retry_request_state->Finalize(true, nullptr));
+      DCHECK(retry_request_state->is_inflight());
+      retry_request_state->Finalize(nullptr);
       return;
     }
     lock_guard<SpinLock> l(client_request_state_lock_);
@@ -376,7 +373,8 @@ void QueryDriver::RetryQueryFromThread(
   query_handle.SetHandle(query_driver, request_state);
   // Do the work of close that needs to be done synchronously, otherwise we'll
   // hit some illegal states in destroying the request_state.
-  RETURN_VOID_IF_ERROR(query_handle->Finalize(true, nullptr));
+  DCHECK(query_handle->is_inflight());
+  query_handle->Finalize(nullptr);
   parent_server_->CloseClientRequestState(query_handle);
   parent_server_->MarkSessionInactive(session);
 }
@@ -430,6 +428,10 @@ void QueryDriver::HandleRetryFailure(Status* status, 
string* error_msg,
 
 Status QueryDriver::Finalize(
     QueryHandle* query_handle, bool check_inflight, const Status* cause) {
+  if (check_inflight && !(*query_handle)->is_inflight()) {
+    return Status("Query not yet running");
+  }
+
   if (!finalized_.CompareAndSwap(false, true)) {
     // Return error as-if the query was already unregistered, so that it 
appears to the
     // client as-if unregistration already happened. We don't need a distinct
@@ -438,7 +440,7 @@ Status QueryDriver::Finalize(
     return Status::Expected(
         TErrorCode::INVALID_QUERY_HANDLE, 
PrintId(client_request_state_->query_id()));
   }
-  RETURN_IF_ERROR((*query_handle)->Finalize(check_inflight, cause));
+  (*query_handle)->Finalize(cause);
   return Status::OK();
 }
 
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index 3ca375a8d..382aab9f4 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -180,14 +180,14 @@ class QueryDriver {
   /// This indicates that the query should no longer be considered registered 
from the
   /// client's point of view. Returns an INVALID_QUERY_HANDLE error if 
finalization
   /// already started. After this method has been called, finalized() will 
return true.
-  /// 'check_inflight' and 'cause' are passed to
-  /// ClientRequestState::Finalize(bool, Status).
+  /// If 'check_inflight' is true and the query is not yet inflight, Finalize 
will error.
+  /// 'cause' is passed to ClientRequestState::Finalize(Status).
   Status Finalize(QueryHandle* query_handle, bool check_inflight, const 
Status* cause);
 
   /// Delete this query from the given QueryDriverMap.
   Status Unregister(ImpalaServer::QueryDriverMap* query_driver_map) 
WARN_UNUSED_RESULT;
 
-  /// True if Finalize() was called.
+  /// True if Finalize() was called while the query was inflight.
   bool finalized() const { return finalized_.Load(); }
 
   /// Creates a new QueryDriver instance using the given ImpalaServer. Creates 
the
@@ -264,8 +264,8 @@ class QueryDriver {
   /// query_driver_map.
   TUniqueId registered_retry_query_id_;
 
-  /// True if a thread has called Finalize(). Threads calling Finalize() do a
-  /// compare-and-swap on this so that only one thread can proceed.
+  /// True if a thread has called Finalize() and the query is inflight. 
Threads calling
+  /// Finalize() do a compare-and-swap on this so that only one thread can 
proceed.
   AtomicBool finalized_{false};
 };
 }
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 4e5e01e19..a4141a44f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1041,9 +1041,9 @@ Status ClientRequestState::ExecShutdownRequest() {
   return Status::OK();
 }
 
-Status ClientRequestState::Finalize(bool check_inflight, const Status* cause) {
+void ClientRequestState::Finalize(const Status* cause) {
   UnRegisterCompletedRPCs();
-  RETURN_IF_ERROR(Cancel(check_inflight, cause, 
/*wait_until_finalized=*/true));
+  Cancel(cause, /*wait_until_finalized=*/true);
   MarkActive();
   // Make sure we join on wait_thread_ before we finish (and especially before 
this object
   // is destroyed).
@@ -1097,7 +1097,6 @@ Status ClientRequestState::Finalize(bool check_inflight, 
const Status* cause) {
   // Update the timeline here so that all of the above work is captured in the 
timeline.
   query_events_->MarkEvent("Unregister query");
   UnRegisterRemainingRPCs();
-  return Status::OK();
 }
 
 Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
@@ -1472,19 +1471,7 @@ Status ClientRequestState::FetchRowsInternal(const 
int32_t max_rows,
   return Status::OK();
 }
 
-Status ClientRequestState::Cancel(
-    bool check_inflight, const Status* cause, bool wait_until_finalized) {
-  if (check_inflight) {
-    // If the query is in 'inflight_queries' it means that the query has 
actually started
-    // executing. It is ok if the query is removed from 'inflight_queries' 
during
-    // cancellation, so we can release the session lock before starting the 
cancellation
-    // work.
-    lock_guard<mutex> session_lock(session_->lock);
-    if (session_->inflight_queries.find(query_id()) == 
session_->inflight_queries.end()) {
-      return Status("Query not yet running");
-    }
-  }
-
+void ClientRequestState::Cancel(const Status* cause, bool 
wait_until_finalized) {
   {
     lock_guard<mutex> lock(lock_);
     // If the query has reached a terminal state, no need to update the state.
@@ -1514,7 +1501,6 @@ Status ClientRequestState::Cancel(
   // started, cancellation is handled by the 'async-exec-thread' thread). 
'lock_' should
   // not be held because cancellation involves RPCs and can block for a long 
time.
   if (GetCoordinator() != nullptr) 
GetCoordinator()->Cancel(wait_until_finalized);
-  return Status::OK();
 }
 
 Status ClientRequestState::UpdateCatalog() {
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 8e5cb4bed..a94304872 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -181,19 +181,13 @@ class ClientRequestState {
   /// until cancellaton of 'coord_' finishes and it is finalized.
   /// 'wait_until_finalized' should only used by the single thread finalizing 
the query,
   /// to avoid many threads piling up waiting for query cancellation.
-  ///
-  /// Only returns an error if 'check_inflight' is true and the query is not 
yet
-  /// in-flight. Otherwise, proceed and return Status::OK() even if the query 
isn't
-  /// in-flight (for cleaning up after an error on the query issuing path).
-  Status Cancel(bool check_inflight, const Status* cause, bool 
wait_until_finalized=false);
+  void Cancel(const Status* cause, bool wait_until_finalized=false);
 
   /// This is called when the query is done (finished, cancelled, or failed). 
This runs
   /// synchronously within the last client RPC and does any work that is 
required before
   /// the query is finished from the client's point of view, including 
cancelling the
-  /// query with 'cause'. Returns an error if 'check_inflight' is true and the 
query is
-  /// not yet in-flight, or if another thread has already started finalizing 
the query.
-  /// Takes lock_: callers must not hold lock() before calling.
-  Status Finalize(bool check_inflight, const Status* cause);
+  /// query with 'cause'. Takes lock_: callers must not hold lock() before 
calling.
+  void Finalize(const Status* cause);
 
   /// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
   /// The given cache is owned by this client request state, even if an error 
is returned.
@@ -329,6 +323,15 @@ class ClientRequestState {
     return ref_count_ > 0;
   }
 
+  inline bool is_inflight() const {
+    // If the query is in 'inflight_queries' it means that the query has 
actually started
+    // executing. It is ok if the query is removed from 'inflight_queries' 
later, so we
+    // can release the session lock before returning.
+    std::lock_guard<std::mutex> session_lock(session_->lock);
+    return session_->inflight_queries.find(query_id())
+        != session_->inflight_queries.end();
+  }
+
   bool is_expired() const {
     std::lock_guard<std::mutex> l(expiration_data_lock_);
     return is_expired_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index aaf4b6f12..ece04cd05 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1749,8 +1749,11 @@ Status ImpalaServer::CancelInternal(const TUniqueId& 
query_id) {
   VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
   QueryHandle query_handle;
   RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
-  RETURN_IF_ERROR(
-      query_handle->Cancel(/*check_inflight=*/ true, /*cause=*/ nullptr));
+  if (!query_handle->is_inflight()) {
+    // Error if the query is not yet inflight as we have no way to cleanly 
cancel it.
+    return Status("Query not yet running");
+  }
+  query_handle->Cancel(/*cause=*/ nullptr);
   return Status::OK();
 }
 
@@ -2027,10 +2030,11 @@ void ImpalaServer::CancelFromThreadPool(const 
CancellationWork& cancellation_wor
     // If the query could not be retried, then cancel the query.
     if (!was_retried) {
       VLOG_QUERY << "CancelFromThreadPool(): cancelling query_id=" << 
PrintId(query_id);
-      Status status = query_handle->Cancel(true, &error);
-      if (!status.ok()) {
+      if (query_handle->is_inflight()) {
+        query_handle->Cancel(&error);
+      } else {
         VLOG_QUERY << "Query cancellation (" << 
PrintId(cancellation_work.query_id())
-                   << ") did not succeed: " << status.GetDetail();
+                   << ") skipped as query was not running.";
       }
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 6bb6b7015..8e77a0bb3 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -207,6 +207,7 @@ import org.apache.impala.thrift.TUnit;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.IcebergUtil;
@@ -2330,6 +2331,11 @@ public class Frontend {
         this, queryCtx.session.database, timeline, user, 
queryCtx.getQuery_id());
     //TODO (IMPALA-8788): should load table write ids in transaction context.
     StmtTableCache stmtTableCache = metadataLoader.loadTables(stmt);
+    if (queryCtx.client_request.query_options.isSetDebug_action()) {
+        DebugUtils.executeDebugAction(
+            queryCtx.client_request.query_options.getDebug_action(),
+            DebugUtils.LOAD_TABLES_DELAY);
+    }
 
     // Add referenced tables to frontend profile
     FrontendProfile.getCurrent().addInfoString("Referenced Tables",
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index b265f46ff..7f9d1b623 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -67,6 +67,9 @@ public class DebugUtils {
   public static final String UPDATE_CATALOG_ABORT_INSERT_TXN =
       "catalogd_update_catalog_abort_txn";
 
+  // debug action label for introducing delay in loading table metadata.
+  public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 11ff2100a..15f52ba71 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -22,6 +22,8 @@ from tests.common.skip import SkipIfBuildType, 
SkipIfDockerizedCluster
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from datetime import datetime
+from multiprocessing import Process, Queue
+from time import sleep, time
 import itertools
 import json
 import os
@@ -898,6 +900,123 @@ class TestWebPage(ImpalaTestSuite):
       if query in json_part['stmt']:
         assert json_part["query_progress"] == "0 / 4 ( 0%)"
 
+  def try_until(self, desc, run, check, timeout=10, interval=0.1):
+    start_time = time()
+    while (time() - start_time < timeout):
+      result = run()
+      if check(result):
+        return result
+      sleep(interval)
+    assert False, "Timed out waiting for " + desc
+
+  def get_queries(self):
+    responses = self.get_and_check_status(
+      self.QUERIES_URL + "?json", ports_to_test=[25000])
+    assert len(responses) == 1
+    response_json = json.loads(responses[0].text)
+    return response_json
+
+  @pytest.mark.execute_serially
+  def test_query_cancel_created(self):
+    """Tests that if we cancel a query in the CREATED state, it still finishes 
and we can
+    cancel it."""
+    query = "select count(*) from functional_parquet.alltypes"
+    delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
+
+    response_json = self.get_queries()
+    assert response_json['num_in_flight_queries'] == 0
+
+    # Start the query completely async. The server doesn't return a response 
until
+    # the query has exited the CREATED state, so we need to get the query ID 
another way.
+    self.client.set_configuration(dict(debug_action=delay_created_action))
+    proc = Process(target=lambda cli, q: cli.execute_async(q), 
args=(self.client, query))
+    proc.start()
+
+    response_json = self.try_until("query creation", self.get_queries,
+        lambda resp: resp['num_in_flight_queries'] > 0)
+    assert len(response_json['in_flight_queries']) == 1
+    assert response_json['in_flight_queries'][0]['state'] == 'CREATED'
+    query_id = response_json['in_flight_queries'][0]['query_id']
+
+    cancel_query_url = 
"{0}cancel_query?json&query_id={1}".format(self.ROOT_URL.format
+      ("25000"), query_id)
+    response = requests.get(cancel_query_url)
+    assert response.status_code == requests.codes.ok
+    response_json = json.loads(response.text)
+    assert response_json['error'] == "Query not yet running\n"
+
+    # Wait for query to start running. It should finish soon after.
+    proc.join()
+    response_json = self.try_until("query finished", self.get_queries,
+        lambda resp: resp['in_flight_queries'][0]['state'] == 'FINISHED')
+    assert response_json['num_in_flight_queries'] == 1
+
+    # We never fetch results for the async query, so it stays in-flight until 
cancelled.
+    response = requests.get(cancel_query_url)
+    assert response.status_code == requests.codes.ok
+    response_json = json.loads(response.text)
+    assert response_json['contents'] == "Query cancellation successful"
+
+    response_json = self.get_queries()
+    assert response_json['num_in_flight_queries'] == 0
+    assert response_json['num_waiting_queries'] == 0
+
+    expected_queries = [q for q in response_json['completed_queries']
+                        if q['query_id'] == query_id]
+    assert len(expected_queries) == 1
+
+  @pytest.mark.execute_serially
+  def test_query_cancel_exception(self):
+    """Tests that if we cancel a query in the CREATED state and it has an 
exception, we
+    can cancel it."""
+    # Trigger UDF ERROR: Cannot divide decimal by zero
+    query = "select *, 1.0/0 from functional_parquet.alltypes limit 10"
+    delay_created_action = "impalad_load_tables_delay:SLEEP@1000"
+
+    response_json = self.get_queries()
+    assert response_json['num_in_flight_queries'] == 0
+
+    def run(queue, client, query):
+      queue.put(client.execute_async(query))
+
+    # Start the query completely async. The server doesn't return a response 
until
+    # the query has exited the CREATED state, so we need to get the query ID 
another way.
+    self.client.set_configuration(dict(debug_action=delay_created_action))
+    queue = Queue()
+    proc = Process(target=run, args=(queue, self.client, query))
+    proc.start()
+
+    response_json = self.try_until("query creation", self.get_queries,
+        lambda resp: resp['num_in_flight_queries'] > 0)
+    assert len(response_json['in_flight_queries']) == 1
+    assert response_json['in_flight_queries'][0]['state'] == 'CREATED'
+    query_id = response_json['in_flight_queries'][0]['query_id']
+
+    cancel_query_url = 
"{0}cancel_query?json&query_id={1}".format(self.ROOT_URL.format
+      ("25000"), query_id)
+    response = requests.get(cancel_query_url)
+    assert response.status_code == requests.codes.ok
+    response_json = json.loads(response.text)
+    assert response_json['error'] == "Query not yet running\n"
+
+    # Fetch query results.
+    query_handle = queue.get()
+    proc.join()
+    assert query_handle
+    try:
+      self.client.fetch(query, query_handle)
+    except Exception as e:
+      re.match("UDF ERROR: Cannot divide decimal by zero", str(e))
+
+    # Failed query should be completed.
+    response_json = self.get_queries()
+    assert response_json['num_in_flight_queries'] == 0
+    assert response_json['num_waiting_queries'] == 0
+
+    expected_queries = [q for q in response_json['completed_queries']
+                        if q['query_id'] == query_id]
+    assert len(expected_queries) == 1
+
 
 class TestWebPageAndCloseSession(ImpalaTestSuite):
   ROOT_URL = "http://localhost:{0}/";

Reply via email to