This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 09a6f0e6c IMPALA-14278: Fix MemTracker data race between ExecEnv and
Webserver
09a6f0e6c is described below
commit 09a6f0e6cd912f573f0d8950abf40f498385c628
Author: Surya Hebbar <[email protected]>
AuthorDate: Tue Aug 5 17:35:38 2025 +0530
IMPALA-14278: Fix MemTracker data race between ExecEnv and Webserver
In the Webserver, while assigning or closing the compressed buffer's
memory tracker, no lock was being held across threads causing
TSAN build failures.
The critical section for this memory tracker is only necessary during
begining of the Webserver and is used rarely. So, only a general mutex
has been used instead of a shared mutex with concurrent reads.
Change-Id: Ife9198e911e526a9a0e88bdb175b4502a5bc2662
Reviewed-on: http://gerrit.cloudera.org:8080/23250
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/util/webserver.cc | 67 +++++++++++++++++++++++++++---------------------
be/src/util/webserver.h | 4 +++
2 files changed, 42 insertions(+), 29 deletions(-)
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index d023e48a6..4cd21f2dd 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -27,6 +27,7 @@
#include <boost/mem_fn.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <gssapi/gssapi_krb5.h>
+#include <shared_mutex>
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>
@@ -365,10 +366,11 @@ Webserver::Webserver(const string& interface, const int
port, MetricGroup* metri
}
Webserver::~Webserver() {
+ Stop();
+ std::shared_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
if (compressed_buffer_mem_tracker_.get() != nullptr) {
compressed_buffer_mem_tracker_->Close();
}
- Stop();
}
void Webserver::ErrorHandler(const WebRequest& req, Document* document) {
@@ -566,20 +568,23 @@ Status Webserver::Start() {
// TODO: IMPALA-14179
// Track memory usage of other template generated webpages in the future
-
- // Initialize compressed_buffer_mem_tracker_, if not initialized
- if (compressed_buffer_mem_tracker_.get() == nullptr) {
- MemTracker* process_mem_tracker = nullptr;
- // Except for impala daemon, ExecEnv does not exist for statestore or
catalog daemons
- if (ExecEnv::GetInstance() != nullptr) {
- // When ExecEnv is present, link to the current process_mem_tracker
- DCHECK(ExecEnv::GetInstance()->process_mem_tracker() != nullptr);
- process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+ {
+ std::unique_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
+ // Initialize compressed_buffer_mem_tracker_, if not initialized
+ if (compressed_buffer_mem_tracker_.get() == nullptr) {
+ MemTracker* process_mem_tracker = nullptr;
+ // Currently, ExecEnv only exists for impalad not for statestore /
catalog daemons
+ if (ExecEnv::GetInstance() != nullptr) {
+ // When ExecEnv is present, link to the current process_mem_tracker
+ DCHECK(ExecEnv::GetInstance()->process_mem_tracker() != nullptr);
+ process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+ }
+ compressed_buffer_mem_tracker_ = std::make_shared<MemTracker>(-1,
+ "WebserverCompressedBuffer", process_mem_tracker);
}
- compressed_buffer_mem_tracker_ = std::make_shared<MemTracker>(-1,
- "WebserverCompressedBuffer", process_mem_tracker);
}
+
LOG(INFO) << "Webserver started";
return Status::OK();
}
@@ -628,23 +633,27 @@ void Webserver::SendResponse(struct sq_connection*
connection,
oss << "Content-Type: " << content_type << CRLF;
const char * accepted_encodings = sq_get_header(connection,
"Accept-Encoding");
- // Include vector's new memory allocations into
'compressed_buffer_mem_tracker_'
- MemTrackerAllocator<uint8_t> vector_mem_tracker_allocator(
- compressed_buffer_mem_tracker_);
- vector<uint8_t> output(0, vector_mem_tracker_allocator);
-
- // If accepted, support responses with gzip compression
- if (accepted_encodings != NULL &&
std::string_view(accepted_encodings).find("gzip")
- != string::npos && CompressStringToBuffer(content, output).ok()) {
- oss << "Content-Encoding: gzip" << CRLF;
- oss << "Content-Length: " << output.size() << CRLF;
- oss << CRLF;
- // Interpret the data in the necessary form, do not reallocate
- oss.write(reinterpret_cast<char*>(output.data()), output.size());
- } else {
- oss << "Content-Length: " << content.size() << CRLF;
- oss << CRLF;
- oss.write(content.data(), content.size());
+ {
+ std::shared_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
+
+ // Include vector's new memory allocations into
'compressed_buffer_mem_tracker_'
+ MemTrackerAllocator<uint8_t> vector_mem_tracker_allocator(
+ compressed_buffer_mem_tracker_);
+ vector<uint8_t> output(0, vector_mem_tracker_allocator);
+
+ // If accepted, support responses with gzip compression
+ if (accepted_encodings != NULL &&
std::string_view(accepted_encodings).find("gzip")
+ != string::npos && CompressStringToBuffer(content, output).ok()) {
+ oss << "Content-Encoding: gzip" << CRLF;
+ oss << "Content-Length: " << output.size() << CRLF;
+ oss << CRLF;
+ // Interpret the data in the necessary form, do not reallocate
+ oss.write(reinterpret_cast<char*>(output.data()), output.size());
+ } else {
+ oss << "Content-Length: " << content.size() << CRLF;
+ oss << CRLF;
+ oss.write(content.data(), content.size());
+ }
}
string output_str = oss.str();
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index 68fd0bbe9..df92d6132 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -21,6 +21,7 @@
#include <string>
#include <boost/function.hpp>
#include <boost/thread/pthread/shared_mutex.hpp>
+#include <shared_mutex>
#include <rapidjson/fwd.h>
#include "common/status.h"
@@ -306,6 +307,9 @@ class Webserver {
/// An incoming connection will be accepted if the OAuth token could be
verified.
bool use_oauth_ = false;
+ // Lock guarding the compressed_buffer_mem_tracker
+ std::shared_mutex compressed_buffer_mem_tracker_lock_;
+
// Track memory for the comppressed string buffer
std::shared_ptr<MemTracker> compressed_buffer_mem_tracker_;