This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f05eac647 IMPALA-12602: Unregister queries on idle timeout
f05eac647 is described below

commit f05eac647647b5e03c3aafc35f785c73d07e2658
Author: Michael Smith <[email protected]>
AuthorDate: Fri Feb 23 15:37:23 2024 -0800

    IMPALA-12602: Unregister queries on idle timeout
    
    Queries cancelled due to idle_query_timeout/QUERY_TIMEOUT_S are now also
    Unregistered to free any remaining memory, as you cannot fetch results
    from a cancelled query.
    
    Adds a new structure - idle_query_statuses_ - to retain Status messages
    for queries closed this way so that we can continue to return a clear
    error message if the client returns and requests query status or
    attempts to fetch results. This structure must be global because HS2
    server can only identify a session ID from a query handle, and the query
    handle no longer exists. SessionState tracks queries added to
    idle_query_statuses_ so they can be cleared when the session is closed.
    
    Also ensures MarkInactive is called in ClientRequestState when Wait()
    completes. Previously WaitInternal would only MarkInactive on success,
    leaving any failed requests in an active state until explicitly closed
    or the session ended.
    
    The beeswax get_log RPC will not return the preserved error message or
    any warnings for these queries. It's also possible the summary and
    profile are rotated out of query log as the query is no longer inflight.
    This is an acceptable outcome as a client will likely not look for a
    log/summary/profile after it times out.
    
    Testing:
    - updates test_query_expiration to verify number of waiting queries is
      only non-zero for queries cancelled by EXEC_TIME_LIMIT_S and not yet
      closed as an idle query
    - modified test_retry_query_timeout to use exec_time_limit_s because
      queries closed by idle_timeout_s don't work with get_exec_summary
    
    Change-Id: Iacfc285ed3587892c7ec6f7df3b5f71c9e41baf0
    Reviewed-on: http://gerrit.cloudera.org:8080/21074
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/client-request-state.cc        |  9 ++-
 be/src/service/impala-beeswax-server.cc       |  9 ++-
 be/src/service/impala-server.cc               | 88 +++++++++++++++++++--------
 be/src/service/impala-server.h                | 24 ++++++--
 docs/topics/impala_timeouts.xml               |  6 +-
 tests/custom_cluster/test_query_expiration.py | 52 ++++++++++++----
 tests/custom_cluster/test_query_retries.py    |  4 +-
 7 files changed, 140 insertions(+), 52 deletions(-)

diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 44aa3b824..99c70e088 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1155,6 +1155,10 @@ bool ClientRequestState::BlockOnWait(int64_t timeout_us, 
int64_t* block_on_wait_
 void ClientRequestState::Wait() {
   // block until results are ready
   Status status = WaitInternal();
+  // Rows are available now (for SELECT statement), so start the 'wait' timer 
that tracks
+  // how long Impala waits for the client to fetch rows. For other statements, 
track the
+  // time until a Close() is received.
+  MarkInactive();
   {
     lock_guard<mutex> l(lock_);
     if (returns_result_set()) {
@@ -1191,7 +1195,6 @@ void ClientRequestState::Wait() {
 Status ClientRequestState::WaitInternal() {
   // Explain requests have already populated the result set. Nothing to do 
here.
   if (exec_request().stmt_type == TStmtType::EXPLAIN) {
-    MarkInactive();
     return Status::OK();
   }
 
@@ -1240,10 +1243,6 @@ Status ClientRequestState::WaitInternal() {
   } else if (isCTAS) {
     SetCreateTableAsSelectResultSet();
   }
-  // Rows are available now (for SELECT statement), so start the 'wait' timer 
that tracks
-  // how long Impala waits for the client to fetch rows. For other statements, 
track the
-  // time until a Close() is received.
-  MarkInactive();
   return Status::OK();
 }
 
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index 2b10e8c17..5bf658188 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -292,7 +292,14 @@ beeswax::QueryState::type ImpalaServer::get_state(
   VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
 
   QueryHandle query_handle;
-  RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), 
SQLSTATE_GENERAL_ERROR);
+  Status status = GetActiveQueryHandle(query_id, &query_handle);
+  // GetActiveQueryHandle may return the query's status from being cancelled. 
If not an
+  // invalid query handle, we can assume that error statuses reflect a query 
in the
+  // EXCEPTION state.
+  if (!status.ok() && status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+    return beeswax::QueryState::EXCEPTION;
+  }
+  RAISE_IF_ERROR(status, SQLSTATE_GENERAL_ERROR);
 
   // Validate that query can be accessed by user.
   RAISE_IF_ERROR(CheckClientRequestSession(session.get(), 
query_handle->effective_user(),
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 963960661..029ae61dc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1428,6 +1428,16 @@ Status ImpalaServer::RegisterQuery(const TUniqueId& 
query_id,
   return Status::OK();
 }
 
+static inline int32_t GetIdleTimeout(const TQueryOptions& query_options) {
+  int32_t idle_timeout_s = query_options.query_timeout_s;
+  if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
+    return min(FLAGS_idle_query_timeout, idle_timeout_s);
+  } else {
+    // Use a non-zero timeout, if one exists
+    return max(FLAGS_idle_query_timeout, idle_timeout_s);
+  }
+}
+
 Status ImpalaServer::SetQueryInflight(
     shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
   DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
@@ -1459,13 +1469,7 @@ Status ImpalaServer::SetQueryInflight(
   }
 
   // If the query has a timeout or time limit, schedule checks.
-  int32_t idle_timeout_s = query_handle->query_options().query_timeout_s;
-  if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
-    idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
-  } else {
-    // Use a non-zero timeout, if one exists
-    idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
-  }
+  int32_t idle_timeout_s = GetIdleTimeout(query_handle->query_options());
   int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
   int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
   int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
@@ -1701,6 +1705,14 @@ Status ImpalaServer::GetActiveQueryHandle(
   DCHECK(query_handle != nullptr);
   shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id);
   if (UNLIKELY(query_driver == nullptr)) {
+    {
+      lock_guard<mutex> l(idle_query_statuses_lock_);
+      auto it = idle_query_statuses_.find(query_id);
+      if (it != idle_query_statuses_.end()) {
+        return it->second;
+      }
+    }
+
     Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, 
PrintId(query_id));
     VLOG(1) << err.GetDetail();
     return err;
@@ -1797,6 +1809,7 @@ Status ImpalaServer::CloseSessionInternal(const 
TUniqueId& session_id,
     DecrementSessionCount(session_state->connected_user);
   }
   unordered_set<TUniqueId> inflight_queries;
+  vector<TUniqueId> idled_queries;
   {
     lock_guard<mutex> l(session_state->lock);
     DCHECK(!session_state->closed);
@@ -1804,6 +1817,7 @@ Status ImpalaServer::CloseSessionInternal(const 
TUniqueId& session_id,
     // Since closed is true, no more queries will be added to the inflight 
list.
     inflight_queries.insert(session_state->inflight_queries.begin(),
         session_state->inflight_queries.end());
+    idled_queries.swap(session_state->idled_queries);
   }
   // Unregister all open queries from this session.
   Status status = Status::Expected("Session closed");
@@ -1811,6 +1825,12 @@ Status ImpalaServer::CloseSessionInternal(const 
TUniqueId& session_id,
     // TODO: deal with an error status
     UnregisterQueryDiscardResult(query_id, false, &status);
   }
+  {
+    lock_guard<mutex> l(idle_query_statuses_lock_);
+    for (const TUniqueId& query_id: idled_queries) {
+      idle_query_statuses_.erase(query_id);
+    }
+  }
   // Reconfigure the poll period of session_maintenance_thread_ if necessary.
   UnregisterSessionTimeout(session_state->session_timeout);
   VLOG_QUERY << "Closed session: " << PrintId(session_id)
@@ -2774,8 +2794,9 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
           continue;
         }
         ClientRequestState* crs = query_driver->GetActiveClientRequestState();
-        if (crs->is_expired()) {
-          // Query was expired already from a previous expiration event.
+        if (crs->is_expired() && expiration_event->kind != 
ExpirationKind::IDLE_TIMEOUT) {
+          // Query was expired already from a previous expiration event. Keep 
idle
+          // timeouts as they will additionally unregister the query.
           expiration_event = queries_by_timestamp_.erase(expiration_event);
           continue;
         }
@@ -2812,13 +2833,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
         // Now check to see if the idle timeout has expired. We must check the 
actual
         // expiration time in case the query has updated 'last_active_ms' 
since the last
         // time we looked.
-        int32_t idle_timeout_s = crs->query_options().query_timeout_s;
-        if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
-          idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
-        } else {
-          // Use a non-zero timeout, if one exists
-          idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
-        }
+        int32_t idle_timeout_s = GetIdleTimeout(crs->query_options());
         int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
         if (now < expiration) {
           // If the real expiration date is in the future we may need to 
re-insert the
@@ -2840,10 +2855,29 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
           VLOG_QUERY << "Expiring query due to client inactivity: "
                      << PrintId(expiration_event->query_id) << ", last 
activity was at: "
                      << ToStringFromUnixMillis(crs->last_active_ms());
-          ExpireQuery(crs,
-              Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
-                  PrintId(expiration_event->query_id),
-                  PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S)));
+          const Status status = 
Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
+              PrintId(expiration_event->query_id),
+              PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
+
+          // Save status so we can report it for unregistered queries.
+          Status preserved_status;
+          {
+            lock_guard<mutex> l(*crs->lock());
+            preserved_status = crs->query_status();
+          }
+          preserved_status.MergeStatus(status);
+          {
+            shared_ptr<SessionState> session = crs->session();
+            lock_guard<mutex> l(session->lock);
+            if (!session->closed) {
+              lock_guard<mutex> l(idle_query_statuses_lock_);
+              idle_query_statuses_.emplace(
+                  expiration_event->query_id, move(preserved_status));
+              session->idled_queries.emplace_back(expiration_event->query_id);
+            }
+          }
+
+          ExpireQuery(crs, status, true);
           expiration_event = queries_by_timestamp_.erase(expiration_event);
         } else {
           // Iterator is moved on in every other branch.
@@ -2972,12 +3006,18 @@ Status 
ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
   return Status::OK();
 }
 
-void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
+void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status,
+    bool unregister) {
   DCHECK(!status.ok());
   cancellation_thread_pool_->Offer(
-      CancellationWork::TerminatedByServer(crs->query_id(), status, false));
-  ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
-  crs->set_expired();
+      CancellationWork::TerminatedByServer(crs->query_id(), status, 
unregister));
+  if (crs->is_expired()) {
+    // Should only be re-entrant if we're now unregistering the query.
+    DCHECK(unregister);
+  } else {
+    ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
+    crs->set_expired();
+  }
 }
 
 Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8024a6133..e3562959d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -185,10 +185,11 @@ class TQueryExecRequest;
 /// 1. session_state_map_lock_
 /// 2. SessionState::lock
 /// 3. query_expiration_lock_
-/// 4. ClientRequestState::fetch_rows_lock
-/// 5. ClientRequestState::lock
-/// 6. ClientRequestState::expiration_data_lock_
-/// 7. Coordinator::exec_summary_lock
+/// 4. idle_query_statuses_lock_
+/// 5. ClientRequestState::fetch_rows_lock
+/// 6. ClientRequestState::lock
+/// 7. ClientRequestState::expiration_data_lock_
+/// 8. Coordinator::exec_summary_lock
 ///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
@@ -657,6 +658,9 @@ class ImpalaServer : public ImpalaServiceIf,
     /// inflight_queries. In that case we add it to prestopped_queries instead.
     std::set<TUniqueId> prestopped_queries;
 
+    /// Unregistered queries we need to clear from idle_query_statuses_ on 
closure.
+    std::vector<TUniqueId> idled_queries;
+
     /// Total number of queries run as part of this session.
     int64_t total_queries;
 
@@ -1153,8 +1157,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// check should be rescheduled for a later time.
   Status CheckResourceLimits(ClientRequestState* crs);
 
-  /// Expire 'crs' and cancel it with status 'status'.
-  void ExpireQuery(ClientRequestState* crs, const Status& status);
+  /// Expire 'crs' and cancel it with status 'status'. Optionally unregisters 
the query.
+  void ExpireQuery(ClientRequestState* crs, const Status& status,
+      bool unregister = false);
 
   typedef boost::unordered_map<std::string, boost::unordered_set<std::string>>
       AuthorizedProxyMap;
@@ -1423,6 +1428,13 @@ class ImpalaServer : public ImpalaServiceIf,
   typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>> 
SessionStateMap;
   SessionStateMap session_state_map_;
 
+  /// Protects idle_query_statuses_;
+  std::mutex idle_query_statuses_lock_;
+
+  /// A map of queries that were stopped due to idle timeout and the status 
they had when
+  /// unregistered. Used to return a more useful error when looking up 
unregistered IDs.
+  std::map<TUniqueId, Status> idle_query_statuses_;
+
   /// Protects connection_to_sessions_map_. See "Locking" in the class comment 
for lock
   /// acquisition order.
   std::mutex connection_to_sessions_map_lock_;
diff --git a/docs/topics/impala_timeouts.xml b/docs/topics/impala_timeouts.xml
index 4ee95e64d..713827138 100644
--- a/docs/topics/impala_timeouts.xml
+++ b/docs/topics/impala_timeouts.xml
@@ -126,8 +126,10 @@ Trying to re-register with state-store</codeblock>
               <codeph>--idle_query_timeout</codeph> disables query timeouts.
           </p>
           <p>
-            Cancelled queries remain in the open state but use only the
-            minimal resources.
+            Cancelled queries are closed, but the client can still fetch their 
exception
+            details. If the query was in a good state when cancelled, it will 
present an
+            error like "Query 6f49e509bfa5b347:207d8ef900000000 expired due to 
client
+            inactivity", otherwise it will show the relevant error.
           </p>
         </li>
 
diff --git a/tests/custom_cluster/test_query_expiration.py 
b/tests/custom_cluster/test_query_expiration.py
index 7e08dc81c..a1d8dd9f4 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -29,18 +29,21 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 class TestQueryExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
 
-  def _check_num_executing(self, impalad, expected):
+  def _check_num_executing(self, impalad, expected, expect_waiting=0):
     in_flight_queries = impalad.service.get_in_flight_queries()
     # Guard against too few in-flight queries.
     assert expected <= len(in_flight_queries)
-    actual = 0
+    actual = waiting = 0
     for query in in_flight_queries:
       if query["executing"]:
         actual += 1
       else:
         assert query["waiting"]
-    assert actual == expected, '%s out of %s queries with expected (%s) 
status' \
+        waiting += 1
+    assert actual == expected, '%s out of %s queries executing (expected %s)' \
         % (actual, len(in_flight_queries), expected)
+    assert waiting == expect_waiting, '%s out of %s queries waiting (expected 
%s)' \
+        % (waiting, len(in_flight_queries), expect_waiting)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
@@ -58,7 +61,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
     # This query will hit a lower time limit.
     client.execute("SET EXEC_TIME_LIMIT_S=3")
-    time_limit_expire_handle = client.execute_async(query1);
+    time_limit_expire_handle = client.execute_async(query1)
     handles.append(time_limit_expire_handle)
 
     # This query will hit a lower idle timeout instead of the default timeout 
or time
@@ -75,30 +78,46 @@ class TestQueryExpiration(CustomClusterTestSuite):
     handles.append(default_timeout_expire_handle2)
     self._check_num_executing(impalad, len(handles))
 
+    # Run a query that fails, and will timeout due to client inactivity.
+    client.execute("SET QUERY_TIMEOUT_S=1")
+    client.execute('SET MEM_LIMIT=1')
+    exception_handle = client.execute_async("select count(*) from 
functional.alltypes")
+    client.execute('SET MEM_LIMIT=1g')
+    handles.append(exception_handle)
+
     before = time()
     sleep(4)
 
-    # Queries with timeout or time limit of 1 should have expired, other 
queries should
+    # Queries with timeout or time limit < 4 should have expired, other 
queries should
     # still be running.
-    assert num_expired + 2 == impalad.service.get_metric_value(
+    assert num_expired + 3 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
     assert (client.get_state(short_timeout_expire_handle) ==
             client.QUERY_STATES['EXCEPTION'])
     assert (client.get_state(time_limit_expire_handle) ==
             client.QUERY_STATES['EXCEPTION'])
+    assert (client.get_state(exception_handle) == 
client.QUERY_STATES['EXCEPTION'])
     assert (client.get_state(default_timeout_expire_handle) ==
             client.QUERY_STATES['FINISHED'])
     assert (client.get_state(default_timeout_expire_handle2) ==
             client.QUERY_STATES['FINISHED'])
+    # The query cancelled by exec_time_limit_s should be waiting to be closed.
+    self._check_num_executing(impalad, 2, 1)
     self.__expect_expired(client, query1, short_timeout_expire_handle,
-        "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout 
is 3s000ms\)")
+        r"Query [0-9a-f]+:[0-9a-f]+ expired due to "
+        + r"client inactivity \(timeout is 3s000ms\)")
     self.__expect_expired(client, query1, time_limit_expire_handle,
-        "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 
3s000ms")
+        r"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 
3s000ms")
+    self.__expect_expired(client, query1, exception_handle,
+        r"minimum memory reservation is greater than memory available.*\nQuery 
"
+        + r"[0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 
1s000ms\)")
     self._check_num_executing(impalad, 2)
+    # Both queries with query_timeout_s < 4 should generate this message.
     self.assert_impalad_log_contains('INFO', "Expiring query due to client 
inactivity: "
-        "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d 
\d\d:\d\d:\d\d")
+        r"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d 
\d\d:\d\d:\d\d",
+        2)
     self.assert_impalad_log_contains('INFO',
-        "Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 3s")
+        r"Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 
3s")
 
     # Wait until the remaining queries expire. The time limit query will have 
hit
     # expirations but only one should be counted.
@@ -124,14 +143,23 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
     # Confirm that no extra expirations happened
     assert 
impalad.service.get_metric_value('impala-server.num-queries-expired') \
-        == len(handles)
+        == num_expired + len(handles)
     self._check_num_executing(impalad, 0)
     for handle in handles:
       try:
         client.close_query(handle)
+        assert False, "Close should always throw an exception"
       except Exception as e:
         # We fetched from some cancelled handles above, which unregistered the 
queries.
-        assert 'Invalid or unknown query handle' in str(e)
+        # Expired queries always return their exception, so will not be 
invalid/unknown.
+        if handle is time_limit_expire_handle:
+          assert 'Invalid or unknown query handle' in str(e)
+        else:
+          if handle is exception_handle:
+            # Error should return original failure and timeout message
+            assert 'minimum memory reservation is greater than memory 
available' in str(e)
+          assert re.search(
+              r'Query [0-9a-f]{16}:[0-9a-f]{16} expired due to client 
inactivity', str(e))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=0")
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index c6fe3e74d..8d0629f0c 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -886,7 +886,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.cluster.impalads[1].kill()
     query = self._count_query
     handle = self.execute_query_async(query,
-        query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
+        query_options={'retry_failed_queries': 'true', 'exec_time_limit_s': 
'1'})
     self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
 
     # Validate the live exec summary.
@@ -908,7 +908,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       self.client.fetch(query, handle)
       assert False
     except Exception as e:
-        assert "expired due to client inactivity" in str(e)
+        assert "expired due to execution time limit of 1s000ms" in str(e)
 
     # Assert that the impalad metrics show one expired query.
     assert 
impalad_service.get_metric_value('impala-server.num-queries-expired') == 1

Reply via email to