This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3e6326538cbd060ab4f2f4a7df9c9d2ac2cc6463 Author: Surya Hebbar <[email protected]> AuthorDate: Sat Mar 8 02:03:57 2025 +0530 IMPALA-13795: Support serving webUI content with gzip compression This patch adds support for serving all the webUI content with gzip content encoding. For large JSONs and text profiles, Impala's webUI renderings maybe hindered by the user's network bandwidth. As the browser's native gzip decompression is very fast e.g. 300-400MB/s, combining it with a faster compression level(i.e. gzip Z_BEST_SPEED) in backend results in significant increases in speed i.e. faster load times. During compression, instead of multiple reallocations, existing string data is reinterpreted to reduce memory usage. In case of failure during compression, the content is served in plain format as before. As currently, none of the memory allocation's are being tracked for the rapidjson's generated documents(or any daemon webserver's served string), it would be helpful to display the peak memory usage of a single buffer used to serve all webUI content. In the future, it is recommended to implement and use custom allocators for all large served strings and rapidjson generated documents. (See IMPALA-14178, IMPALA-14179) Memory trackers within ExecEnv are now initialized before enabling the webserver, allowing their use as parent memory trackers. For now, the memory used by the compressed buffer, for each compressed response is being tracked. (i.e. through the "WebserverCompressedBuffer" MemTracker) Example: For Impala daemon, it is included in the execution environment's process memory tracker and displayed on the /memz page as follows. # After serving a general webpage like /memz WebserverCompressedBuffer: Total=0 Peak=227.56 KB # After serving a query profile text / JSON WebserverCompressedBuffer: Total=0 Peak=4.09 MB Tests: * Added new tests to validate plain and gzipped content encoding headers in test_web_pages.py - TestWebPage:test_content_encoding in util/webserver-test.cc - Webserver::ContentEncodingHeadersTest * The pre-existing tests validate the content in test_web_pages.py, all tests request and validate gzipped content in util/webserver-test.cc, all tests request and validate plain text * Performance: Approximate improvements for a TPC-DS 14 query ran locally with 3 nodes with defaults -> JSON profile : 4.53MB to 428.94KB Without throttling / Raw local: 421ms to 421ms Based on firefox's throttling(8 mbps): 8s to 2s -> Text profile : 1.24MB to 219KB Without throttling / Raw local: 281ms to 281ms Based on firefox's throttling(8 mbps): 1.3s to 281ms Change-Id: I431088a30337bbef2c8d6e16dd15fb6572db0f15 Reviewed-on: http://gerrit.cloudera.org:8080/22599 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Riza Suminto <[email protected]> --- be/src/runtime/exec-env.cc | 15 ++--- be/src/util/codec.cc | 1 - be/src/util/webserver-test.cc | 41 +++++++++++++ be/src/util/webserver.cc | 124 ++++++++++++++++++++++++++++---------- be/src/util/webserver.h | 9 +++ tests/webserver/test_web_pages.py | 10 +++ 6 files changed, 160 insertions(+), 40 deletions(-) diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 065f2b014..fd7658527 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -446,13 +446,6 @@ Status ExecEnv::Init() { InitSystemStateInfo(); - if (enable_webserver_) { - RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(webserver_.get())); - } - if (FLAGS_metrics_webserver_port > 0) { - RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(metrics_webserver_.get())); - RETURN_IF_ERROR(metrics_webserver_->Start()); - } catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server"); catalogd_lightweight_req_client_cache_->InitMetrics( metrics_.get(), "catalog.server", "for-lightweight-rpc"); @@ -464,6 +457,14 @@ Status ExecEnv::Init() { InitMemTracker(bytes_limit); + if (enable_webserver_) { + RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(webserver_.get())); + } + if (FLAGS_metrics_webserver_port > 0) { + RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(metrics_webserver_.get())); + RETURN_IF_ERROR(metrics_webserver_->Start()); + } + // Initializes the RPCMgr, ControlServices and DataStreamServices. // Initialization needs to happen in the following order due to dependencies: // - RPC manager, DataStreamService and DataStreamManager. diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc index 37533c064..57307ffb0 100644 --- a/be/src/util/codec.cc +++ b/be/src/util/codec.cc @@ -21,7 +21,6 @@ #include <utility> #include <boost/algorithm/string.hpp> -#include <zstd.h> #include "common/compiler-util.h" #include "common/logging.h" diff --git a/be/src/util/webserver-test.cc b/be/src/util/webserver-test.cc index ad8d44c8e..a3e83c2ee 100644 --- a/be/src/util/webserver-test.cc +++ b/be/src/util/webserver-test.cc @@ -116,6 +116,47 @@ TEST(Webserver, PostTest) { ASSERT_TRUE(success) << "POST unexpectedly failed"; } +TEST(Webserver, ContentEncodingHeadersTest) { + // This test validates the content-encoding headers for compression + MetricGroup metrics("webserver-test"); + Webserver webserver("", FLAGS_webserver_port, &metrics); + ASSERT_OK(webserver.Start()); + AddDefaultUrlCallbacks(&webserver); + + // The actual compressed content is parsed and validated through + // the tests present in tests/webserver/test_web_pages.py::TestWebPage, + // where 'requests' API has "Accept-Encoding: gzip" by default for all the tests + + // Raw text without content-encoding is still supported, and is validated from + // the pre-existing tests here. + + + // 1. Validate headers for raw response + // Add request headers + HttpRequest req_raw; + + stringstream contents; + ASSERT_OK(req_raw.Get(&contents)); + + // Validate headers + string contents_string = contents.str(); + EXPECT_TRUE(contents_string.find("Content-Encoding") == string::npos); + EXPECT_TRUE(contents_string.find("gzip") == string::npos); + contents.clear(); + + // 2. Validate headers for gzip encoded response + // Add request headers + HttpRequest req_gzip; + req_gzip.headers.emplace("Accept-Encoding", "gzip"); + + ASSERT_OK(req_gzip.Get(&contents)); + + // Validate headers + contents_string = contents.str(); + EXPECT_TRUE(contents_string.find("Content-Encoding") != string::npos); + EXPECT_TRUE(contents_string.find("gzip") != string::npos); +} + void AssertArgsCallback(bool* success, const Webserver::WebRequest& req, Document* document) { const auto& args = req.parsed_args; diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index be7e3300d..d023e48a6 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -18,9 +18,7 @@ #include "util/webserver.h" #include <signal.h> -#include <stdio.h> #include <fstream> -#include <map> #include <string> #include <boost/algorithm/string.hpp> #include <boost/bind.hpp> @@ -32,6 +30,7 @@ #include <rapidjson/document.h> #include <rapidjson/prettywriter.h> #include <rapidjson/stringbuffer.h> +#include <zlib.h> #include "common/logging.h" #include "common/global-flags.h" @@ -47,21 +46,23 @@ #include "rpc/authentication.h" #include "rpc/thrift-util.h" #include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" #include "service/impala-server.h" #include "thirdparty/mustache/mustache.h" #include "util/asan.h" #include "util/coding-util.h" +#include "util/codec.h" #include "util/cpu-info.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/jwt-util.h" #include "util/mem-info.h" #include "util/metrics.h" -#include "util/openssl-util.h" #include "util/os-info.h" #include "util/os-util.h" #include "util/pretty-printer.h" #include "util/process-state-info.h" +#include "util/scope-exit-trigger.h" #include "util/stopwatch.h" #include "common/names.h" @@ -239,35 +240,27 @@ string HttpStatusCodeToString(HttpStatusCode code) { return ""; } -void SendResponse(struct sq_connection* connection, const string& response_code_line, - const string& content_type, const string& content, - const vector<string>& header_lines) { - // Buffer the output and send it in a single call to sq_write in order to avoid - // triggering an interaction between Nagle's algorithm and TCP delayed acks. - std::ostringstream oss; - oss << "HTTP/1.1 " << response_code_line << CRLF; - for (const auto& h : header_lines) { - oss << h << CRLF; - } - oss << "X-Frame-Options: " << FLAGS_webserver_x_frame_options << CRLF; - oss << "X-Content-Type-Options: nosniff" << CRLF; - oss << "Cache-Control: no-store" << CRLF; - if (!FLAGS_disable_content_security_policy_header) { - oss << "Content-Security-Policy: " << CSP_HEADER << CRLF; - } - - struct sq_request_info* request_info = sq_get_request_info(connection); - if (request_info->is_ssl) { - oss << "Strict-Transport-Security: max-age=31536000; includeSubDomains" << CRLF; - } - oss << "Content-Type: " << content_type << CRLF; - oss << "Content-Length: " << content.size() << CRLF; - oss << CRLF; - oss << content; - - // Make sure to use sq_write for printing the body; sq_printf truncates at 8kb - string output = oss.str(); - sq_write(connection, output.c_str(), output.length()); +Status CompressStringToBuffer(const string& content, + vector<uint8_t>& output) { + // Declare the compressor + scoped_ptr<Codec> compressor; + Codec::CodecInfo codec_info(THdfsCompression::GZIP, Z_BEST_SPEED); + LOG_AND_RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, codec_info, &compressor)); + auto compressor_cleanup = MakeScopeExitTrigger([&compressor]() { + compressor->Close(); + }); + // Interpret the data in the required form, do not reallocate + uint8_t* content_buffer = reinterpret_cast<uint8_t*>(const_cast<char*>( + content.data())); + // Allocate the necessary space + int64_t compressed_length = compressor->MaxOutputLen(content.size(), content_buffer); + output.resize(compressed_length); + // Transform the space into necessary form + uint8_t* compressed_buffer = output.data(); + LOG_AND_RETURN_IF_ERROR(compressor->ProcessBlock(true, content.size(), content_buffer, + &compressed_length, &compressed_buffer)); + output.resize(compressed_length); + return Status::OK(); } // Return the address of the remote user from the squeasel request info. @@ -372,6 +365,9 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri } Webserver::~Webserver() { + if (compressed_buffer_mem_tracker_.get() != nullptr) { + compressed_buffer_mem_tracker_->Close(); + } Stop(); } @@ -568,6 +564,22 @@ Status Webserver::Start() { return Status(error_msg.str()); } + // TODO: IMPALA-14179 + // Track memory usage of other template generated webpages in the future + + // Initialize compressed_buffer_mem_tracker_, if not initialized + if (compressed_buffer_mem_tracker_.get() == nullptr) { + MemTracker* process_mem_tracker = nullptr; + // Except for impala daemon, ExecEnv does not exist for statestore or catalog daemons + if (ExecEnv::GetInstance() != nullptr) { + // When ExecEnv is present, link to the current process_mem_tracker + DCHECK(ExecEnv::GetInstance()->process_mem_tracker() != nullptr); + process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); + } + compressed_buffer_mem_tracker_ = std::make_shared<MemTracker>(-1, + "WebserverCompressedBuffer", process_mem_tracker); + } + LOG(INFO) << "Webserver started"; return Status::OK(); } @@ -592,6 +604,54 @@ void Webserver::Init() { "$0://$1:$2", IsSecure() ? "https" : "http", hostname_, http_address_.port); } +void Webserver::SendResponse(struct sq_connection* connection, + const string& response_code_line, const string& content_type, const string& content, + const vector<string>& header_lines) { + // Buffer the output and send it in a single call to sq_write in order to avoid + // triggering an interaction between Nagle's algorithm and TCP delayed acks. + std::ostringstream oss; + oss << "HTTP/1.1 " << response_code_line << CRLF; + for (const auto& h : header_lines) { + oss << h << CRLF; + } + oss << "X-Frame-Options: " << FLAGS_webserver_x_frame_options << CRLF; + oss << "X-Content-Type-Options: nosniff" << CRLF; + oss << "Cache-Control: no-store" << CRLF; + if (!FLAGS_disable_content_security_policy_header) { + oss << "Content-Security-Policy: " << CSP_HEADER << CRLF; + } + + struct sq_request_info* request_info = sq_get_request_info(connection); + if (request_info->is_ssl) { + oss << "Strict-Transport-Security: max-age=31536000; includeSubDomains" << CRLF; + } + oss << "Content-Type: " << content_type << CRLF; + const char * accepted_encodings = sq_get_header(connection, "Accept-Encoding"); + + // Include vector's new memory allocations into 'compressed_buffer_mem_tracker_' + MemTrackerAllocator<uint8_t> vector_mem_tracker_allocator( + compressed_buffer_mem_tracker_); + vector<uint8_t> output(0, vector_mem_tracker_allocator); + + // If accepted, support responses with gzip compression + if (accepted_encodings != NULL && std::string_view(accepted_encodings).find("gzip") + != string::npos && CompressStringToBuffer(content, output).ok()) { + oss << "Content-Encoding: gzip" << CRLF; + oss << "Content-Length: " << output.size() << CRLF; + oss << CRLF; + // Interpret the data in the necessary form, do not reallocate + oss.write(reinterpret_cast<char*>(output.data()), output.size()); + } else { + oss << "Content-Length: " << content.size() << CRLF; + oss << CRLF; + oss.write(content.data(), content.size()); + } + string output_str = oss.str(); + + // Make sure to use sq_write for writing the output, as sq_printf truncates at 8kb + sq_write(connection, output_str.data(), output_str.size()); +} + void Webserver::GetCommonJson(Document* document, const struct sq_connection* connection, const WebRequest& req, const std::string& csrf_token) { DCHECK(document != nullptr); diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h index eb34c317a..68fd0bbe9 100644 --- a/be/src/util/webserver.h +++ b/be/src/util/webserver.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "kudu/util/web_callback_registry.h" +#include "runtime/mem-tracker.h" #include "thirdparty/squeasel/squeasel.h" #include "util/ldap-util.h" #include "util/metrics-fwd.h" @@ -184,6 +185,11 @@ class Webserver { static int LogMessageCallbackStatic(const struct sq_connection* connection, const char* message); + /// Send the provided response through the squeasel connection + void SendResponse(struct sq_connection* connection, + const std::string& response_code_line, const std::string& content_type, + const std::string& content, const std::vector<std::string>& header_lines); + /// Squeasel callback for HTTP request events. Static so that it can act as a function /// pointer, and then call the next method. Returns squeasel success code. static sq_callback_result_t BeginRequestCallbackStatic( @@ -300,6 +306,9 @@ class Webserver { /// An incoming connection will be accepted if the OAuth token could be verified. bool use_oauth_ = false; + // Track memory for the comppressed string buffer + std::shared_ptr<MemTracker> compressed_buffer_mem_tracker_; + /// Used to validate usernames/passwords If LDAP authentication is in use. std::unique_ptr<ImpalaLdap> ldap_; diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index bcb88d2f4..c80a77f20 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -80,6 +80,16 @@ class TestWebPage(ImpalaTestSuite): """Tests that the root URL is accessible and loads properly""" self.get_and_check_status(self.ROOT_URL) + def test_content_encoding(self): + responses = self.get_and_check_status(self.ROOT_URL, headers={"Accept-Encoding": ""}) + for response in responses: + assert "Content-Encoding" not in response.headers + responses = self.get_and_check_status(self.ROOT_URL, + headers={"Accept-Encoding": "gzip"}) + for response in responses: + assert "Content-Encoding" in response.headers + assert response.headers["Content-Encoding"] == "gzip" + def test_get_build_flags(self): """Tests that the build flags on the root page contain valid values""" for port in self.TEST_PORTS_WITH_SS:
