This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f78b3c5cb IMPALA-13347: Fixes TSAN Thread Leak of Workload Management
Thread
f78b3c5cb is described below
commit f78b3c5cb0f4672007efc618f0317ae7701e4866
Author: Jason Fehr <[email protected]>
AuthorDate: Tue Sep 3 12:38:50 2024 -0700
IMPALA-13347: Fixes TSAN Thread Leak of Workload Management Thread
The workload management processing runs in a separate thread declared
in impala-server.h. This thread runs until a graceful shutdown is
initiated. The last step of the Impala coordinator shutdown process
is to drain the completed queries queue to the query log table thus
ensuring completed queries do not get lost.
This thread has to run to completion, but the coordinator shutdown
process never joins that thread. This patch adds the joining of that
thread during the coordinator shutdown process. If the workload
management shutdown process exceedes the allotted time, the thread is
detached.
Info level logging was added to indicate which completed queries
queue drain situation occurred - successful or timed out.
A new custom cluster test was added to test the situation where the
completed queries queue drain process times out.
Change-Id: I1e95967bb6e04470a8900c9ba69080eea8aaa25e
Reviewed-on: http://gerrit.cloudera.org:8080/21744
Reviewed-by: Riza Suminto <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/service/impala-server.cc | 23 ++++++++++++++---
be/src/service/impala-server.h | 5 ++--
be/src/service/workload-management-init.cc | 5 ----
be/src/service/workload-management.cc | 33 ++++++++++++++++++++++--
be/src/service/workload-management.h | 15 +++++++++++
tests/custom_cluster/test_query_log.py | 40 +++++++++++++++++++++++++++++-
6 files changed, 106 insertions(+), 15 deletions(-)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 126f3535b..88ea712ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -134,6 +134,7 @@ DECLARE_string(debug_actions);
DECLARE_bool(abort_on_config_error);
DECLARE_bool(disk_spill_encryption);
DECLARE_bool(enable_ldap_auth);
+DECLARE_bool(enable_workload_mgmt);
DECLARE_bool(gen_experimental_profile);
DECLARE_bool(use_local_catalog);
@@ -3217,9 +3218,21 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t
hs2_port,
internal_server_ = shared_from_this();
- ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
- bind<void>(&ImpalaServer::InitWorkloadManagement, this),
- &workload_management_thread_));
+ // Initialize workload management (if enabled).
+ {
+ lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
+
+ // Skip starting workload management if workload management is not
enabled or the
+ // coordinator shutdown has run before this code runs.
+ if (FLAGS_enable_workload_mgmt && workload_mgmt_thread_state_ ==
NOT_STARTED) {
+
+ ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
+ bind<void>(&ImpalaServer::InitWorkloadManagement, this),
+ &workload_management_thread_));
+
+ workload_mgmt_thread_state_ = STARTED;
+ }
+ }
}
LOG(INFO) << "Initialized coordinator/executor Impala server on "
<<
TNetworkAddressToString(exec_env_->configured_backend_address());
@@ -3403,7 +3416,9 @@ Status ImpalaServer::StartShutdown(
}
// Drain the completed queries queue to the query log table.
- ShutdownWorkloadManagement();
+ if (FLAGS_enable_workload_mgmt) {
+ ShutdownWorkloadManagement();
+ }
LOG(INFO) << "Shutdown complete, going down.";
// Use _exit here instead since exit() does cleanup which interferes with
the shutdown
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f2b86c329..d382491de 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1148,9 +1148,8 @@ class ImpalaServer : public ImpalaServiceIf,
/// current query ids to the admissiond.
[[noreturn]] void AdmissionHeartbeatThread();
- /// Checks if workload management is enabled, and starts the init process if
it is
- /// enabled. Does not return until coordinator shutdown. Returns immediately
if
- /// workload management is not enabled.
+ /// Starts the workload management init process. Does not return until
coordinator
+ /// shutdown. DOES NOT check if workload management is enabled.
/// (implemented in workload-management-init.cc)
void InitWorkloadManagement();
diff --git a/be/src/service/workload-management-init.cc
b/be/src/service/workload-management-init.cc
index c35052092..6e98ea99f 100644
--- a/be/src/service/workload-management-init.cc
+++ b/be/src/service/workload-management-init.cc
@@ -48,7 +48,6 @@ using boost::algorithm::trim_copy;
using kudu::Version;
using kudu::ParseVersion;
-DECLARE_bool(enable_workload_mgmt);
DECLARE_int32(query_log_write_interval_s);
DECLARE_int32(query_log_write_timeout_s);
DECLARE_string(query_log_request_pool);
@@ -197,10 +196,6 @@ static void _errorIfDowngrade(const Version target_ver,
const Version actual_ver
} // _errorIfDowngrade
void ImpalaServer::InitWorkloadManagement() {
- if (!FLAGS_enable_workload_mgmt) {
- return;
- }
-
// Fully qualified table name based on startup flags.
const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
diff --git a/be/src/service/workload-management.cc
b/be/src/service/workload-management.cc
index f0b15c794..c290b92c0 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -53,11 +53,12 @@ using namespace std;
DECLARE_bool(enable_workload_mgmt);
DECLARE_int32(query_log_write_interval_s);
+DECLARE_int32(query_log_max_insert_attempts);
DECLARE_int32(query_log_max_queued);
-DECLARE_string(workload_mgmt_user);
DECLARE_int32(query_log_shutdown_timeout_s);
+DECLARE_string(debug_actions);
+DECLARE_string(workload_mgmt_user);
DECLARE_string(cluster_id);
-DECLARE_int32(query_log_max_insert_attempts);
namespace impala {
@@ -109,6 +110,16 @@ size_t ImpalaServer::NumLiveQueries() {
void ImpalaServer::ShutdownWorkloadManagement() {
unique_lock<mutex> l(workload_mgmt_threadstate_mu_);
+
+ // Handle the situation where this function runs before the workload
management process
+ // has been started and thus workload_management_thread_ holds a nullptr.
+ if (workload_mgmt_thread_state_ == NOT_STARTED) {
+ workload_mgmt_thread_state_ = SHUTDOWN;
+ return;
+ }
+
+ DCHECK_NE(nullptr, workload_management_thread_.get());
+
// If the completed queries thread is not yet running, then we don't need to
give it a
// chance to flush the in-memory queue to the completed queries table.
if (workload_mgmt_thread_state_ == RUNNING) {
@@ -118,6 +129,22 @@ void ImpalaServer::ShutdownWorkloadManagement() {
chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
[this]{ return workload_mgmt_thread_state_ == SHUTDOWN; });
}
+
+ switch (workload_mgmt_thread_state_) {
+ case SHUTDOWN:
+ // Safe to join the thread here because the workload managmenent
processing loop
+ // sets the thread state to ThreadState::SHUTDOWN immediately before it
returns.
+ LOG(INFO) << "Workload management shutdown successful";
+ workload_management_thread_->Join();
+ break;
+ default:
+ // The shutdown timeout expired without the completed queries queue
draining.
+ LOG(INFO) << "Workload management shutdown timed out. Up to '"
+ << ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() << "' queries
may have "
+ << "been lost";
+ workload_management_thread_->Detach();
+ break;
+ }
} // ImpalaServer::ShutdownWorkloadManagement
void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle,
@@ -256,6 +283,8 @@ void ImpalaServer::WorkloadManagementWorker(
});
completed_queries_ticker_->ResetWakeupGuard();
+ DebugActionNoFail(FLAGS_debug_actions, "WM_SHUTDOWN_DELAY");
+
if (completed_queries_.empty()) continue;
if (MaxRecordsExceeded(completed_queries_.size())) {
diff --git a/be/src/service/workload-management.h
b/be/src/service/workload-management.h
index 868cebebd..96040b38d 100644
--- a/be/src/service/workload-management.h
+++ b/be/src/service/workload-management.h
@@ -92,10 +92,25 @@ extern const std::array<FieldDefinition,
NumQueryTableColumns> FIELD_DEFINITIONS
/// the ThreadState variable must only happen after taking a lock on the
associated mutex.
/// Can be used to track the lifecycle of a thread.
enum ThreadState {
+ // Workload management has not started.
NOT_STARTED,
+
+ // Thread has started and initial checks in
ImpalaServer::InitWorkloadManagement(),
+ // implemented in workload-management-init.cc, are running.
+ STARTED,
+
+ // Initial checks have passed, workload management initial setup can run.
INITIALIZING,
+
+ // Initial setup of the workload management db tables is done, completed
queries queue
+ // is now being processed.
RUNNING,
+
+ // Coordinator graceful shutdown initiated, and all running queries have
finished or
+ // been cancelled. The completed queries queue can now be drained.
SHUTTING_DOWN,
+
+ // In-memory completed queries queue drained, coordinator shutdown can
finish.
SHUTDOWN
};
diff --git a/tests/custom_cluster/test_query_log.py
b/tests/custom_cluster/test_query_log.py
index ccd106ea8..8f829b289 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -690,7 +690,9 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--query_log_write_interval_s=9999 "
"--shutdown_grace_period_s=0 "
- "--shutdown_deadline_s=15",
+ "--shutdown_deadline_s=15 "
+ "--debug_actions="
+
"WM_SHUTDOWN_DELAY:SLEEP@5000",
catalogd_args="--enable_workload_mgmt")
def test_flush_on_shutdown(self, vector):
"""Asserts that queries that have completed but are not yet written to the
query
@@ -714,6 +716,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
60)
impalad.kill_and_wait_for_exit(SIGRTMIN)
+ self.assert_impalad_log_contains("INFO", r'Workload management shutdown
successful',
+ timeout_s=60)
client2 = self.create_client_for_nth_impalad(1,
vector.get_value('protocol'))
@@ -733,6 +737,40 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
finally:
client2.close()
+ @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+
"--query_log_write_interval_s=9999 "
+ "--shutdown_grace_period_s=0 "
+
"--query_log_shutdown_timeout_s=3 "
+ "--shutdown_deadline_s=15 "
+ "--debug_actions="
+
"WM_SHUTDOWN_DELAY:SLEEP@10000",
+ catalogd_args="--enable_workload_mgmt")
+ def test_shutdown_flush_timed_out(self, vector):
+ """Asserts that queries that have completed but are not yet written to the
query
+ log table are lost if the completed queries queue drain takes too long
and that
+ the coordinator logs the estimated number of queries lost."""
+
+ impalad = self.cluster.get_first_impalad()
+ client = self.get_client(vector.get_value('protocol'))
+
+ # Execute sql statements to ensure all get written to the query log table.
+ sql1 = client.execute("select 1")
+ assert sql1.success
+
+ sql2 = client.execute("select 2")
+ assert sql2.success
+
+ sql3 = client.execute("select 3")
+ assert sql3.success
+
+
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued",
3,
+ 60)
+
+ impalad.kill_and_wait_for_exit(SIGRTMIN)
+ self.assert_impalad_log_contains("INFO", r"Workload management shutdown
timed out. "
+ r"Up to '3' queries may have been lost",
+ timeout_s=60)
+
class TestQueryLogTableAll(TestQueryLogTableBase):
"""Tests to assert the query log table is correctly populated when using all
the