This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f78b3c5cb IMPALA-13347: Fixes TSAN Thread Leak of Workload Management 
Thread
f78b3c5cb is described below

commit f78b3c5cb0f4672007efc618f0317ae7701e4866
Author: Jason Fehr <[email protected]>
AuthorDate: Tue Sep 3 12:38:50 2024 -0700

    IMPALA-13347: Fixes TSAN Thread Leak of Workload Management Thread
    
    The workload management processing runs in a separate thread declared
    in impala-server.h. This thread runs until a graceful shutdown is
    initiated. The last step of the Impala coordinator shutdown process
    is to drain the completed queries queue to the query log table thus
    ensuring completed queries do not get lost.
    
    This thread has to run to completion, but the coordinator shutdown
    process never joins that thread. This patch adds the joining of that
    thread during the coordinator shutdown process. If the workload
    management shutdown process exceedes the allotted time, the thread is
    detached.
    
    Info level logging was added to indicate which completed queries
    queue drain situation occurred - successful or timed out.
    
    A new custom cluster test was added to test the situation where the
    completed queries queue drain process times out.
    
    Change-Id: I1e95967bb6e04470a8900c9ba69080eea8aaa25e
    Reviewed-on: http://gerrit.cloudera.org:8080/21744
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-server.cc            | 23 ++++++++++++++---
 be/src/service/impala-server.h             |  5 ++--
 be/src/service/workload-management-init.cc |  5 ----
 be/src/service/workload-management.cc      | 33 ++++++++++++++++++++++--
 be/src/service/workload-management.h       | 15 +++++++++++
 tests/custom_cluster/test_query_log.py     | 40 +++++++++++++++++++++++++++++-
 6 files changed, 106 insertions(+), 15 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 126f3535b..88ea712ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -134,6 +134,7 @@ DECLARE_string(debug_actions);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
 DECLARE_bool(enable_ldap_auth);
+DECLARE_bool(enable_workload_mgmt);
 DECLARE_bool(gen_experimental_profile);
 DECLARE_bool(use_local_catalog);
 
@@ -3217,9 +3218,21 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
 
     internal_server_ = shared_from_this();
 
-    ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
-      bind<void>(&ImpalaServer::InitWorkloadManagement, this),
-    &workload_management_thread_));
+    // Initialize workload management (if enabled).
+    {
+      lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
+
+      // Skip starting workload management if workload management is not 
enabled or the
+      // coordinator shutdown has run before this code runs.
+      if (FLAGS_enable_workload_mgmt && workload_mgmt_thread_state_ == 
NOT_STARTED) {
+
+        ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
+          bind<void>(&ImpalaServer::InitWorkloadManagement, this),
+        &workload_management_thread_));
+
+        workload_mgmt_thread_state_ = STARTED;
+      }
+    }
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
             << 
TNetworkAddressToString(exec_env_->configured_backend_address());
@@ -3403,7 +3416,9 @@ Status ImpalaServer::StartShutdown(
   }
 
   // Drain the completed queries queue to the query log table.
-  ShutdownWorkloadManagement();
+  if (FLAGS_enable_workload_mgmt) {
+    ShutdownWorkloadManagement();
+  }
 
   LOG(INFO) << "Shutdown complete, going down.";
   // Use _exit here instead since exit() does cleanup which interferes with 
the shutdown
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f2b86c329..d382491de 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1148,9 +1148,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// current query ids to the admissiond.
   [[noreturn]] void AdmissionHeartbeatThread();
 
-  /// Checks if workload management is enabled, and starts the init process if 
it is
-  /// enabled. Does not return until coordinator shutdown. Returns immediately 
if
-  /// workload management is not enabled.
+  /// Starts the workload management init process. Does not return until 
coordinator
+  /// shutdown.  DOES NOT check if workload management is enabled.
   /// (implemented in workload-management-init.cc)
   void InitWorkloadManagement();
 
diff --git a/be/src/service/workload-management-init.cc 
b/be/src/service/workload-management-init.cc
index c35052092..6e98ea99f 100644
--- a/be/src/service/workload-management-init.cc
+++ b/be/src/service/workload-management-init.cc
@@ -48,7 +48,6 @@ using boost::algorithm::trim_copy;
 using kudu::Version;
 using kudu::ParseVersion;
 
-DECLARE_bool(enable_workload_mgmt);
 DECLARE_int32(query_log_write_interval_s);
 DECLARE_int32(query_log_write_timeout_s);
 DECLARE_string(query_log_request_pool);
@@ -197,10 +196,6 @@ static void _errorIfDowngrade(const Version target_ver, 
const Version actual_ver
 } // _errorIfDowngrade
 
 void ImpalaServer::InitWorkloadManagement() {
-  if (!FLAGS_enable_workload_mgmt) {
-    return;
-  }
-
   // Fully qualified table name based on startup flags.
   const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
 
diff --git a/be/src/service/workload-management.cc 
b/be/src/service/workload-management.cc
index f0b15c794..c290b92c0 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -53,11 +53,12 @@ using namespace std;
 
 DECLARE_bool(enable_workload_mgmt);
 DECLARE_int32(query_log_write_interval_s);
+DECLARE_int32(query_log_max_insert_attempts);
 DECLARE_int32(query_log_max_queued);
-DECLARE_string(workload_mgmt_user);
 DECLARE_int32(query_log_shutdown_timeout_s);
+DECLARE_string(debug_actions);
+DECLARE_string(workload_mgmt_user);
 DECLARE_string(cluster_id);
-DECLARE_int32(query_log_max_insert_attempts);
 
 namespace impala {
 
@@ -109,6 +110,16 @@ size_t ImpalaServer::NumLiveQueries() {
 
 void ImpalaServer::ShutdownWorkloadManagement() {
   unique_lock<mutex> l(workload_mgmt_threadstate_mu_);
+
+  // Handle the situation where this function runs before the workload 
management process
+  // has been started and thus workload_management_thread_ holds a nullptr.
+  if (workload_mgmt_thread_state_ == NOT_STARTED) {
+    workload_mgmt_thread_state_ = SHUTDOWN;
+    return;
+  }
+
+  DCHECK_NE(nullptr, workload_management_thread_.get());
+
   // If the completed queries thread is not yet running, then we don't need to 
give it a
   // chance to flush the in-memory queue to the completed queries table.
   if (workload_mgmt_thread_state_ == RUNNING) {
@@ -118,6 +129,22 @@ void ImpalaServer::ShutdownWorkloadManagement() {
         chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
         [this]{ return workload_mgmt_thread_state_ == SHUTDOWN; });
   }
+
+  switch (workload_mgmt_thread_state_) {
+    case SHUTDOWN:
+      // Safe to join the thread here because the workload managmenent 
processing loop
+      // sets the thread state to ThreadState::SHUTDOWN immediately before it 
returns.
+      LOG(INFO) << "Workload management shutdown successful";
+      workload_management_thread_->Join();
+      break;
+    default:
+      // The shutdown timeout expired without the completed queries queue 
draining.
+      LOG(INFO) << "Workload management shutdown timed out. Up to '"
+      << ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() << "' queries 
may have "
+      << "been lost";
+      workload_management_thread_->Detach();
+      break;
+  }
 } // ImpalaServer::ShutdownWorkloadManagement
 
 void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle,
@@ -256,6 +283,8 @@ void ImpalaServer::WorkloadManagementWorker(
         });
     completed_queries_ticker_->ResetWakeupGuard();
 
+    DebugActionNoFail(FLAGS_debug_actions, "WM_SHUTDOWN_DELAY");
+
     if (completed_queries_.empty()) continue;
 
     if (MaxRecordsExceeded(completed_queries_.size())) {
diff --git a/be/src/service/workload-management.h 
b/be/src/service/workload-management.h
index 868cebebd..96040b38d 100644
--- a/be/src/service/workload-management.h
+++ b/be/src/service/workload-management.h
@@ -92,10 +92,25 @@ extern const std::array<FieldDefinition, 
NumQueryTableColumns> FIELD_DEFINITIONS
 /// the ThreadState variable must only happen after taking a lock on the 
associated mutex.
 /// Can be used to track the lifecycle of a thread.
 enum ThreadState {
+  // Workload management has not started.
   NOT_STARTED,
+
+  // Thread has started and initial checks in 
ImpalaServer::InitWorkloadManagement(),
+  // implemented in workload-management-init.cc, are running.
+  STARTED,
+
+  // Initial checks have passed, workload management initial setup can run.
   INITIALIZING,
+
+  // Initial setup of the workload management db tables is done, completed 
queries queue
+  // is now being processed.
   RUNNING,
+
+  // Coordinator graceful shutdown initiated, and all running queries have 
finished or
+  // been cancelled. The completed queries queue can now be drained.
   SHUTTING_DOWN,
+
+  // In-memory completed queries queue drained, coordinator shutdown can 
finish.
   SHUTDOWN
 };
 
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index ccd106ea8..8f829b289 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -690,7 +690,9 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=9999 "
                                                  "--shutdown_grace_period_s=0 "
-                                                 "--shutdown_deadline_s=15",
+                                                 "--shutdown_deadline_s=15 "
+                                                 "--debug_actions="
+                                                 
"WM_SHUTDOWN_DELAY:SLEEP@5000",
                                     catalogd_args="--enable_workload_mgmt")
   def test_flush_on_shutdown(self, vector):
     """Asserts that queries that have completed but are not yet written to the 
query
@@ -714,6 +716,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
         60)
 
     impalad.kill_and_wait_for_exit(SIGRTMIN)
+    self.assert_impalad_log_contains("INFO", r'Workload management shutdown 
successful',
+      timeout_s=60)
 
     client2 = self.create_client_for_nth_impalad(1, 
vector.get_value('protocol'))
 
@@ -733,6 +737,40 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     finally:
       client2.close()
 
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--query_log_write_interval_s=9999 "
+                                                 "--shutdown_grace_period_s=0 "
+                                                 
"--query_log_shutdown_timeout_s=3 "
+                                                 "--shutdown_deadline_s=15 "
+                                                 "--debug_actions="
+                                                 
"WM_SHUTDOWN_DELAY:SLEEP@10000",
+                                    catalogd_args="--enable_workload_mgmt")
+  def test_shutdown_flush_timed_out(self, vector):
+    """Asserts that queries that have completed but are not yet written to the 
query
+       log table are lost if the completed queries queue drain takes too long 
and that
+       the coordinator logs the estimated number of queries lost."""
+
+    impalad = self.cluster.get_first_impalad()
+    client = self.get_client(vector.get_value('protocol'))
+
+    # Execute sql statements to ensure all get written to the query log table.
+    sql1 = client.execute("select 1")
+    assert sql1.success
+
+    sql2 = client.execute("select 2")
+    assert sql2.success
+
+    sql3 = client.execute("select 3")
+    assert sql3.success
+
+    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
+        60)
+
+    impalad.kill_and_wait_for_exit(SIGRTMIN)
+    self.assert_impalad_log_contains("INFO", r"Workload management shutdown 
timed out. "
+      r"Up to '3' queries may have been lost",
+      timeout_s=60)
+
 
 class TestQueryLogTableAll(TestQueryLogTableBase):
   """Tests to assert the query log table is correctly populated when using all 
the

Reply via email to