This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b500a55c IMPALA-13313: Fix ExpireQueries deadlock
4b500a55c is described below

commit 4b500a55cbfcdd311a1c766e33849f7ae05a1a8e
Author: Michael Smith <[email protected]>
AuthorDate: Tue Aug 20 15:44:29 2024 -0700

    IMPALA-13313: Fix ExpireQueries deadlock
    
    IMPALA-12602 introduced registering idle queries with a session so that
    we can expire queries while still making their status available, and
    clean up the idle query status when sessions are closed. That happens in
    ImpalaServer::ExpireQueries, where it needs to acquire the
    query_expiration_lock_ then a session_state->lock.
    
    However that violated the lock order documented in impala-server.h, and
    led to a deadlock when a query is expired at the same time another query
    is registering expiration timers (which follows the documented order).
    When the deadlock occurs, SetQueryInFlight holds a session_state->lock
    and tries to acquire query_expiration_lock_, while ExpireQueries holds
    the query_expiration_lock_ and tries to acquire session_state->lock.
    
    The prior order between query_expiration_lock_ and session_state->lock
    was largely arbitrary. query_expiration_lock_ operations don't
    inherently require holding the session_state->lock. However expiration
    operations work on a queue of ClientRequestStates that map to different
    session states, so when we need to operate on a session state as part of
    expiration we pretty much have to take query_expiration_lock_ first.
    
    Updates lock order to take query_expiration_lock_ before
    session_state->lock, and modifies SetQueryInFlight to release the
    session_state->lock before registering expiration timers. The expiration
    timers aren't related to the session, and query lifetime is maintained
    by the QueryHandle reference.
    
    Adds a custom cluster test that uses debug actions to reproduce the
    deadlock scenario.
    
    Change-Id: I6fce4103f6eeb7e9a4320ba1da817cab81071ba3
    Reviewed-on: http://gerrit.cloudera.org:8080/21699
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Michael Smith <[email protected]>
---
 be/src/service/impala-server.cc               | 53 +++++++++++++++------------
 be/src/service/impala-server.h                |  4 +-
 tests/custom_cluster/test_query_expiration.py | 28 ++++++++++++++
 3 files changed, 59 insertions(+), 26 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 99c528ed1..4a77680a3 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1455,30 +1455,33 @@ Status ImpalaServer::SetQueryInflight(
     shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
   DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
   const TUniqueId& query_id = query_handle->query_id();
-  lock_guard<mutex> l(session_state->lock);
-  // The session wasn't expired at the time it was checked out and it isn't 
allowed to
-  // expire while checked out, so it must not be expired.
-  DCHECK_GT(session_state->ref_count, 0);
-  DCHECK(!session_state->expired);
-  // The session may have been closed after it was checked out.
-  if (session_state->closed) {
-    VLOG(1) << "Session closed: cannot set " << PrintId(query_id) << " 
in-flight";
-    return Status::Expected("Session closed");
-  }
-
-  // Acknowledge the query by incrementing total_queries.
-  ++session_state->total_queries;
-  // If the query was already closed - only possible by query retry logic - 
skip
-  // scheduling it to be unregistered with the session and adding timeouts 
checks.
-  if (session_state->prestopped_queries.erase(query_id) > 0) {
-    VLOG_QUERY << "Query " << PrintId(query_id) << " closed, skipping 
in-flight.";
-    return Status::OK();
-  }
-  // Add query to the set that will be unregistered if session is closed.
-  auto inflight_it = session_state->inflight_queries.insert(query_id);
-  if (UNLIKELY(!inflight_it.second)) {
-    LOG(WARNING) << "Query " << PrintId(query_id) << " is already in-flight.";
-    DCHECK(false) << "SetQueryInflight called twice for query_id=" << 
PrintId(query_id);
+  {
+    // Take session state lock while operating on the state.
+    lock_guard<mutex> l(session_state->lock);
+    // The session wasn't expired at the time it was checked out and it isn't 
allowed to
+    // expire while checked out, so it must not be expired.
+    DCHECK_GT(session_state->ref_count, 0);
+    DCHECK(!session_state->expired);
+    // The session may have been closed after it was checked out.
+    if (session_state->closed) {
+      VLOG(1) << "Session closed: cannot set " << PrintId(query_id) << " 
in-flight";
+      return Status::Expected("Session closed");
+    }
+
+    // Acknowledge the query by incrementing total_queries.
+    ++session_state->total_queries;
+    // If the query was already closed - only possible by query retry logic - 
skip
+    // scheduling it to be unregistered with the session and adding timeouts 
checks.
+    if (session_state->prestopped_queries.erase(query_id) > 0) {
+      VLOG_QUERY << "Query " << PrintId(query_id) << " closed, skipping 
in-flight.";
+      return Status::OK();
+    }
+    // Add query to the set that will be unregistered if session is closed.
+    auto inflight_it = session_state->inflight_queries.insert(query_id);
+    if (UNLIKELY(!inflight_it.second)) {
+      LOG(WARNING) << "Query " << PrintId(query_id) << " is already 
in-flight.";
+      DCHECK(false) << "SetQueryInflight called twice for query_id=" << 
PrintId(query_id);
+    }
   }
 
   // If the query has a timeout or time limit, schedule checks.
@@ -1490,6 +1493,7 @@ Status ImpalaServer::SetQueryInflight(
       query_handle->query_options().join_rows_produced_limit;
   if (idle_timeout_s > 0 || exec_time_limit_s > 0 || cpu_limit_s > 0
       || scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
+    DebugActionNoFail(query_handle->query_options(), 
"SET_QUERY_INFLIGHT_EXPIRATION");
     lock_guard<mutex> l2(query_expiration_lock_);
     int64_t now = UnixMillis();
     if (idle_timeout_s > 0) {
@@ -2867,6 +2871,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
           const Status status = 
Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
               PrintId(expiration_event->query_id),
               PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
+          DebugActionNoFail(crs->query_options(), "EXPIRE_INACTIVE_QUERY");
 
           // Save status so we can report it for unregistered queries.
           Status preserved_status;
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 1d936dc4b..0dba61c5c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -184,8 +184,8 @@ class TQueryExecRequest;
 /// locks are acquired, lower-numbered locks must be acquired before 
higher-numbered
 /// locks:
 /// 1. session_state_map_lock_
-/// 2. SessionState::lock
-/// 3. query_expiration_lock_
+/// 2. query_expiration_lock_
+/// 3. SessionState::lock
 /// 4. idle_query_statuses_lock_
 /// 5. ClientRequestState::fetch_rows_lock
 /// 6. ClientRequestState::lock
diff --git a/tests/custom_cluster/test_query_expiration.py 
b/tests/custom_cluster/test_query_expiration.py
index a1d8dd9f4..6bd62461a 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -302,3 +302,31 @@ class TestQueryExpiration(CustomClusterTestSuite):
     for t in non_expiring_time_limit_threads:
       assert t.success
       assert t.data[0] == '7300' # Number of rows in alltypes.
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args()
+  def test_query_expiration_no_deadlock(self):
+    """Regression test for IMPALA-13313: Confirm that queries do not deadlock 
when one is
+    expiring while another starts."""
+    impalad = self.cluster.get_first_impalad()
+    num_expired = 
impalad.service.get_metric_value('impala-server.num-queries-expired')
+
+    handle = self.execute_query_async("SELECT SLEEP(1000000)",
+        {'query_timeout_s': '1', 'debug_action': 
'EXPIRE_INACTIVE_QUERY:SLEEP@3000'})
+
+    before = time()
+
+    # Prior to fixing IMPALA-13313, this query would enter a deadlock with the 
expiration
+    # for the previous query, and both would be unable to finish.
+    result = self.execute_query("SELECT 1", {
+        'query_timeout_s': '10',
+        'debug_action': 'SET_QUERY_INFLIGHT_EXPIRATION:SLEEP@5000'})
+    assert result.success
+
+    impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
+                                          num_expired + 1)
+
+    assert time() - before < 10
+
+    self.__expect_client_state(self.client, handle,
+                               self.client.QUERY_STATES['EXCEPTION'])

Reply via email to