This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit c9745fd5b941f52b3cd3496c425722fcbbffe894 Author: Joe McDonnell <[email protected]> AuthorDate: Fri May 10 14:15:38 2024 -0700 IMPALA-13020 (part 2): Split out external vs internal Thrift max message size The Thrift max message size is designed to protect against malicious messages that consume a lot of memory on the receiver. This is an important security measure for externally facing services, but it can interfere with internal communication within the cluster. Currently, the max message size is controlled by a single startup flag for both. This puts tensions between having a low value to protect against malicious messages versus having a high value to avoid issues with internal communication (e.g. large statestore updates). This introduces a new flag thrift_external_rpc_max_message_size to specify the limit for externally-facing services. The current thrift_rpc_max_message_size now applies only for internal services. Splitting them apart allows setting a much higher value for internal services (64GB) while leaving the externally facing services using the current 2GB limit. This modifies various code locations that wrap a Thrift transport to pass in the original transport's TConfiguration. This also adds DCHECKs to make sure that the new transport inherits the max message size. This limits the locations where we actually need to set max message size. ThriftServer/ThriftServerBuilder have a setting "is_external_facing" which can be specified on each ThriftServer. This modifies statestore and catalog to set is_external_facing to false. All other servers stay with the default of true. Testing: - This adds a test case to verify that is_external_facing uses the higher limit. - Ran through the steps in testdata/scale_test_metadata/README.md and updated the value in that doc. - Created many tables to push the catalog-update topic to be >2GB and verified that statestore successfully sends it when an impalad restarts. Change-Id: Ib9a649ef49a8a99c7bd9a1b73c37c4c621661311 Reviewed-on: http://gerrit.cloudera.org:8080/21420 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Riza Suminto <[email protected]> Reviewed-by: Michael Smith <[email protected]> (cherry picked from commit bcff4df6194b2f192d937bb9c031721feccb69df) --- be/src/catalog/catalogd-main.cc | 2 + be/src/common/init.cc | 19 +++++++- be/src/rpc/TAcceptQueueServer.cpp | 14 ++++-- be/src/rpc/TAcceptQueueServer.h | 8 +++- be/src/rpc/authentication.cc | 3 +- be/src/rpc/thrift-client.cc | 6 ++- be/src/rpc/thrift-server-test.cc | 16 +++++-- be/src/rpc/thrift-server.cc | 8 ++-- be/src/rpc/thrift-server.h | 30 ++++++++++--- be/src/rpc/thrift-util.cc | 50 +++++++++++++++++----- be/src/rpc/thrift-util.h | 44 +++++++++++++++---- be/src/statestore/statestore-subscriber.cc | 2 + be/src/statestore/statestore.cc | 4 ++ be/src/transport/THttpTransport.cpp | 2 +- be/src/transport/TSaslServerTransport.cpp | 9 ++-- be/src/transport/TSaslTransport.cpp | 8 ++-- be/src/util/backend-gflag-util.cc | 4 +- be/src/util/parquet-reader.cc | 2 +- testdata/scale_test_metadata/README.md | 2 +- testdata/scale_test_metadata/create-wide-table.sql | 4 +- 20 files changed, 179 insertions(+), 58 deletions(-) diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc index a668480ee..da20216a9 100644 --- a/be/src/catalog/catalogd-main.cc +++ b/be/src/catalog/catalogd-main.cc @@ -84,6 +84,8 @@ int CatalogdMain(int argc, char** argv) { ThriftServer* server; ThriftServerBuilder builder("CatalogService", processor, FLAGS_catalog_service_port); + // Mark this as an internal service to use a more permissive Thrift max message size + builder.is_external_facing(false); if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; diff --git a/be/src/common/init.cc b/be/src/common/init.cc index dca56aa2c..0eb085e39 100644 --- a/be/src/common/init.cc +++ b/be/src/common/init.cc @@ -84,6 +84,7 @@ DECLARE_string(reserved_words_version); DECLARE_bool(symbolize_stacktrace); DECLARE_string(debug_actions); DECLARE_int64(thrift_rpc_max_message_size); +DECLARE_int64(thrift_external_rpc_max_message_size); DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds " "between memory maintenance iterations"); @@ -537,10 +538,24 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm, FLAGS_reserved_words_version)); } + // Enforce a minimum value for thrift_max_message_size, as configuring the limit to + // a small value is very unlikely to work. if (!impala::TestInfo::is_test() && FLAGS_thrift_rpc_max_message_size > 0 && FLAGS_thrift_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { - CLEAN_EXIT_WITH_ERROR(Substitute("thrift_rpc_max_message_size must be >= $0 or <= 0.", - ThriftDefaultMaxMessageSize())); + CLEAN_EXIT_WITH_ERROR( + Substitute("Invalid $0: $1 is less than the minimum value of $2.", + "thrift_rpc_max_message_size", FLAGS_thrift_rpc_max_message_size, + ThriftDefaultMaxMessageSize())); + } + + // Enforce a minimum value for thrift_external_max_message_size, as configuring the + // limit to a small value is very unlikely to work. + if (!impala::TestInfo::is_test() && FLAGS_thrift_external_rpc_max_message_size > 0 + && FLAGS_thrift_external_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { + CLEAN_EXIT_WITH_ERROR( + Substitute("Invalid $0: $1 is less than the minimum value of $2.", + "thrift_external_rpc_max_message_size", + FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize())); } impala::InitGoogleLoggingSafe(argv[0]); diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp index ada6fd7ff..4f405cda8 100644 --- a/be/src/rpc/TAcceptQueueServer.cpp +++ b/be/src/rpc/TAcceptQueueServer.cpp @@ -201,10 +201,12 @@ TAcceptQueueServer::TAcceptQueueServer(const shared_ptr<TProcessor>& processor, const shared_ptr<TTransportFactory>& transportFactory, const shared_ptr<TProtocolFactory>& protocolFactory, const shared_ptr<ThreadFactory>& threadFactory, const string& name, - int32_t maxTasks, int64_t queue_timeout_ms, int64_t idle_poll_period_ms) + int32_t maxTasks, int64_t queue_timeout_ms, int64_t idle_poll_period_ms, + bool is_external_facing) : TServer(processor, serverTransport, transportFactory, protocolFactory), threadFactory_(threadFactory), name_(name), maxTasks_(maxTasks), - queue_timeout_ms_(queue_timeout_ms), idle_poll_period_ms_(idle_poll_period_ms) { + queue_timeout_ms_(queue_timeout_ms), idle_poll_period_ms_(idle_poll_period_ms), + is_external_facing_(is_external_facing) { init(); } @@ -230,7 +232,9 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) { if (metrics_enabled_) queue_size_metric_->Increment(-1); shared_ptr<TTransport> io_transport; shared_ptr<TTransport> client = entry->client_; - SetMaxMessageSize(client.get()); + int64_t max_message_size = is_external_facing_ ? ThriftExternalRpcMaxMessageSize() : + ThriftInternalRpcMaxMessageSize(); + SetMaxMessageSize(client.get(), max_message_size); const string& socket_info = reinterpret_cast<TSocket*>(client.get())->getSocketInfo(); VLOG(2) << Substitute("TAcceptQueueServer: $0 started connection setup for client $1", name_, socket_info); @@ -252,7 +256,9 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) { // TSaslServerTransport::Factory is not required anymore. DCHECK(inputTransportFactory_ == outputTransportFactory_); io_transport = inputTransportFactory_->getTransport(client); - SetMaxMessageSize(io_transport.get()); + DCHECK_EQ(io_transport->getConfiguration()->getMaxMessageSize(), + client->getConfiguration()->getMaxMessageSize()); + DCHECK_EQ(max_message_size, io_transport->getConfiguration()->getMaxMessageSize()); shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(io_transport); diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h index d0d077ef6..730be4e3e 100644 --- a/be/src/rpc/TAcceptQueueServer.h +++ b/be/src/rpc/TAcceptQueueServer.h @@ -63,7 +63,8 @@ class TAcceptQueueServer : public TServer { const std::shared_ptr<TProtocolFactory>& protocolFactory, const std::shared_ptr<ThreadFactory>& threadFactory, const std::string& name, int32_t maxTasks = 0, - int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0); + int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0, + bool is_external_facing = true); ~TAcceptQueueServer() override = default; @@ -127,6 +128,11 @@ class TAcceptQueueServer : public TServer { /// wakes up to check if the connection should be closed due to inactivity. If 0, no /// polling happens. int64_t idle_poll_period_ms_; + + /// Whether this is interacting with external untrusted clients. If true, this + /// uses ThriftExternalRpcMaxMessageSize(). If false, this uses the + /// ThriftInternalRpcMaxMessageSize(). + bool is_external_facing_; }; } // namespace server diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc index 60c662bf7..59dbec614 100644 --- a/be/src/rpc/authentication.cc +++ b/be/src/rpc/authentication.cc @@ -1345,7 +1345,8 @@ Status SecureAuthProvider::WrapClientTransport(const string& hostname, return Status(e.what()); } wrapped_transport->reset(new TSaslClientTransport(sasl_client, raw_transport)); - SetMaxMessageSize(wrapped_transport->get()); + // Verify that the wrapped transport inherits the max message size properly. + VerifyMaxMessageSizeInheritance(raw_transport.get(), wrapped_transport->get()); // This function is called immediately prior to sasl_client_start(), and so // can be used to log an "I'm beginning authentication for this principal" diff --git a/be/src/rpc/thrift-client.cc b/be/src/rpc/thrift-client.cc index bba4356c7..d02280a8c 100644 --- a/be/src/rpc/thrift-client.cc +++ b/be/src/rpc/thrift-client.cc @@ -139,7 +139,11 @@ Status ThriftClientImpl::CreateSocket() { return Status(TErrorCode::SSL_SOCKET_CREATION_FAILED, e.what()); } } - if (socket_ != nullptr) SetMaxMessageSize(socket_.get()); + if (socket_ != nullptr) { + // ThriftClient is used for internal cluster communication, so we use + // ThriftInternalRpcMaxMessageSize(). + SetMaxMessageSize(socket_.get(), ThriftInternalRpcMaxMessageSize()); + } return Status::OK(); } diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc index ab579b094..cbbb6d074 100644 --- a/be/src/rpc/thrift-server-test.cc +++ b/be/src/rpc/thrift-server-test.cc @@ -47,7 +47,7 @@ DECLARE_int32(state_store_port); DECLARE_int32(beeswax_port); -DECLARE_int64(thrift_rpc_max_message_size); +DECLARE_int64(thrift_external_rpc_max_message_size); static string IMPALA_HOME(getenv("IMPALA_HOME")); static const string& SERVER_CERT = @@ -154,10 +154,13 @@ TEST(ThriftTestBase, Connectivity) { ASSERT_OK(wrong_port_client.Open()); } -void TestMaxMessageSize(std::string subscriber_id, bool expect_throw) { +void TestMaxMessageSize(std::string subscriber_id, bool expect_throw, + bool is_external_facing = true) { int port = GetServerPort(); ThriftServer* server; - EXPECT_OK(ThriftServerBuilder("DummyStatestore", MakeProcessor(), port).Build(&server)); + ThriftServerBuilder server_builder("DummyStatestore", MakeProcessor(), port); + server_builder.is_external_facing(is_external_facing); + EXPECT_OK(server_builder.Build(&server)); ASSERT_OK(server->Start()); ThriftClient<StatestoreServiceClientWrapper> client( @@ -185,6 +188,11 @@ TEST(ThriftTestBase, MaxMessageSizeExceeded) { TestMaxMessageSize(long_id, true); } +TEST(ThriftTestBase, InternalMaxMessageSizeFit) { + std::string long_id(256 * 1024, 'a'); + TestMaxMessageSize(long_id, /* expect_throw */ false, /* is_external_facing */ false); +} + TEST_P(ThriftKerberizedParamsTest, SslConnectivity) { int port = GetServerPort(); // Start a server using SSL and confirm that an SSL client can connect, while a non-SSL @@ -754,7 +762,7 @@ TEST(NoPasswordPemFile, BadServerCertificate) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST); - FLAGS_thrift_rpc_max_message_size = 128 * 1024; + FLAGS_thrift_external_rpc_max_message_size = 128 * 1024; int port = impala::FindUnusedEphemeralPort(); std::unique_ptr<impala::MiniKdcWrapper> kdc; diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc index 7ee4bd7c5..f6587a816 100644 --- a/be/src/rpc/thrift-server.cc +++ b/be/src/rpc/thrift-server.cc @@ -269,7 +269,8 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* context, ThriftServer::ThriftServer(const string& name, const std::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider, MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms, - int64_t idle_poll_period_ms, TransportType transport_type) + int64_t idle_poll_period_ms, TransportType transport_type, + bool is_external_facing) : started_(false), port_(port), ssl_enabled_(false), @@ -283,7 +284,8 @@ ThriftServer::ThriftServer(const string& name, connection_handler_(NULL), metrics_(NULL), auth_provider_(auth_provider), - transport_type_(transport_type) { + transport_type_(transport_type), + is_external_facing_(is_external_facing) { if (auth_provider_ == NULL) { auth_provider_ = AuthManager::GetInstance()->GetInternalAuthProvider(); } @@ -403,7 +405,7 @@ Status ThriftServer::Start() { server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory, protocol_factory, thread_factory, name_, max_concurrent_connections_, - queue_timeout_ms_, idle_poll_period_ms_)); + queue_timeout_ms_, idle_poll_period_ms_, is_external_facing_)); if (metrics_ != NULL) { (static_cast<TAcceptQueueServer*>(server_.get())) ->InitMetrics(metrics_, metrics_name_); diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h index f7e6bf615..da2b21180 100644 --- a/be/src/rpc/thrift-server.h +++ b/be/src/rpc/thrift-server.h @@ -75,15 +75,15 @@ class ThriftServer { virtual std::shared_ptr<apache::thrift::transport::TTransport> getTransport( std::shared_ptr<apache::thrift::transport::TTransport> trans) { - DCHECK_EQ(ThriftRpcMaxMessageSize(), - trans->getConfiguration()->getMaxMessageSize()); std::shared_ptr<apache::thrift::transport::TTransport> wrapped = wrapped_factory_->getTransport(trans); - SetMaxMessageSize(wrapped.get()); + // Make sure the max message size got inherited properly. + VerifyMaxMessageSizeInheritance(trans.get(), wrapped.get()); std::shared_ptr<apache::thrift::transport::TTransport> buffered_wrapped = std::shared_ptr<apache::thrift::transport::TTransport>( new apache::thrift::transport::TBufferedTransport( - wrapped, buffer_size_, DefaultTConfiguration())); + wrapped, buffer_size_, wrapped->getConfiguration())); + VerifyMaxMessageSizeInheritance(wrapped.get(), buffered_wrapped.get()); return buffered_wrapped; } private: @@ -273,12 +273,19 @@ class ThriftServer { /// - idle_poll_period_ms: Amount of time, in milliseconds, of client's inactivity /// before the service thread wakes up to check if the connection should be closed /// due to inactivity. If 0, no polling happens. + /// - is_external_facing: Whether this ThriftServer interacts with untrusted/external + /// clients. This impacts the max message size used for Thrift, as untrusted + /// communication uses a more restrictive limit to protect against malicious + /// messages. When set to false, this uses a less restrictive max message size as + /// messages are constructed by other Impala cluster components. This defaults to + /// true to be safe by default. ThriftServer(const std::string& name, const std::shared_ptr<apache::thrift::TProcessor>& processor, int port, AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr, int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0, - TransportType server_transport = TransportType::BINARY); + TransportType server_transport = TransportType::BINARY, + bool is_external_facing = true); /// Enables secure access over SSL. Must be called before Start(). The first three /// arguments are the minimum SSL/TLS version, and paths to certificate and private key @@ -391,6 +398,8 @@ class ThriftServer { /// Underlying transport type used by this thrift server. TransportType transport_type_; + + bool is_external_facing_; }; /// Helper class to build new ThriftServer instances. @@ -479,6 +488,14 @@ class ThriftServerBuilder { return *this; } + /// Sets whether the Thrift server will interact with external non-Impala clients. + /// If set to true, this uses ThriftExternalRpcMaxMessageSize(). If set to false, + /// this uses ThriftInternalRpcMaxMessageSize(). + ThriftServerBuilder& is_external_facing(bool is_external_facing) { + is_external_facing_ = is_external_facing; + return *this; + } + /// Constructs a new ThriftServer and puts it in 'server', if construction was /// successful, returns an error otherwise. In the error case, 'server' will not have /// been set and will not need to be freed, otherwise the caller assumes ownership of @@ -487,7 +504,7 @@ class ThriftServerBuilder { std::unique_ptr<ThriftServer> ptr( new ThriftServer(name_, processor_, port_, auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_, - server_transport_type_)); + server_transport_type_, is_external_facing_)); if (enable_ssl_) { RETURN_IF_ERROR(ptr->EnableSsl( version_, certificate_, private_key_, pem_password_cmd_, cipher_list_, @@ -520,6 +537,7 @@ class ThriftServerBuilder { std::string tls_ciphersuites_ = kudu::security::SecurityDefaults::kDefaultTlsCipherSuites; bool disable_tls12_ = false; + bool is_external_facing_ = true; }; /// Contains a map from string for --ssl_minimum_version to Thrift's SSLProtocol. diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc index 5b3ae17cd..510c33062 100644 --- a/be/src/rpc/thrift-util.cc +++ b/be/src/rpc/thrift-util.cc @@ -58,10 +58,17 @@ #include "common/names.h" -DEFINE_int64(thrift_rpc_max_message_size, std::numeric_limits<int32_t>::max(), - "The maximum size of a message for any RPC that the server will accept. " - "Default to the upper limit of 2147483647 bytes (~2GB). " - "Setting 0 or negative value will use the default defined in Thrift."); +DEFINE_int64(thrift_rpc_max_message_size, 64L * 1024 * 1024 * 1024, + "The maximum size of a message for intra-cluster RPC communication between Impala " + "components. Default to a high limit of 64GB. " + "This must be set to at least the default defined in Thrift (100MB). " + "Setting 0 or a negative value will use the default defined in Thrift."); + +DEFINE_int64(thrift_external_rpc_max_message_size, 2L * 1024 * 1024 * 1024, + "The maximum size of a message for external client RPC communication. " + "This defaults to 2GB to limit the impact of untrusted payloads. " + "This must be set to at least the default defined in Thrift (100MB). " + "Setting 0 or a negative value will use the default defined in Thrift."); using namespace apache::thrift; using namespace apache::thrift::transport; @@ -85,8 +92,10 @@ static_assert(PACKAGE_VERSION[6] == '\0', NEW_THRIFT_VERSION_MSG); namespace impala { +// The ThriftSerializer uses the DefaultInternalTConfiguration() with the higher limit, +// because this is used on our internal Thrift structures. ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size) - : mem_buffer_(new TMemoryBuffer(initial_buffer_size, DefaultTConfiguration())) { + : mem_buffer_(new TMemoryBuffer(initial_buffer_size, DefaultInternalTConfiguration())) { if (compact) { TCompactProtocolFactoryT<TMemoryBuffer> factory; protocol_ = factory.getProtocol(mem_buffer_); @@ -280,19 +289,38 @@ bool IsConnResetTException(const TTransportException& e) { strstr(e.what(), "SSL_read: Connection reset by peer") != nullptr); } -int64_t ThriftRpcMaxMessageSize() { +int64_t ThriftInternalRpcMaxMessageSize() { return FLAGS_thrift_rpc_max_message_size <= 0 ? ThriftDefaultMaxMessageSize() : FLAGS_thrift_rpc_max_message_size; } -shared_ptr<TConfiguration> DefaultTConfiguration() { - return make_shared<TConfiguration>(ThriftRpcMaxMessageSize()); +int64_t ThriftExternalRpcMaxMessageSize() { + return FLAGS_thrift_external_rpc_max_message_size <= 0 ? + ThriftDefaultMaxMessageSize() : FLAGS_thrift_external_rpc_max_message_size; +} + +shared_ptr<TConfiguration> DefaultInternalTConfiguration() { + return make_shared<TConfiguration>(ThriftInternalRpcMaxMessageSize()); } -void SetMaxMessageSize(TTransport* transport) { +shared_ptr<TConfiguration> DefaultExternalTConfiguration() { + return make_shared<TConfiguration>(ThriftExternalRpcMaxMessageSize()); +} + +void SetMaxMessageSize(TTransport* transport, int64_t max_message_size) { // TODO: Find way to assign TConfiguration through TTransportFactory instead. - transport->getConfiguration()->setMaxMessageSize(ThriftRpcMaxMessageSize()); + transport->getConfiguration()->setMaxMessageSize(max_message_size); transport->updateKnownMessageSize(-1); - EXPECT_NO_THROW(transport->checkReadBytesAvailable(ThriftRpcMaxMessageSize())); + EXPECT_NO_THROW(transport->checkReadBytesAvailable(max_message_size)); +} + +void VerifyMaxMessageSizeInheritance(TTransport* source, TTransport* dest) { + // Verify that the source has the max message size set to either the internal + // limit or the external limit + int64_t source_max_message_size = source->getConfiguration()->getMaxMessageSize(); + DCHECK(source_max_message_size == ThriftInternalRpcMaxMessageSize() || + source_max_message_size == ThriftExternalRpcMaxMessageSize()); + // Verify that the destination transport has the same limit as the source + DCHECK_EQ(source_max_message_size, dest->getConfiguration()->getMaxMessageSize()); } } diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h index e3a7a6948..c10039d30 100644 --- a/be/src/rpc/thrift-util.h +++ b/be/src/rpc/thrift-util.h @@ -44,14 +44,36 @@ inline int64_t ThriftDefaultMaxMessageSize() { } /// Return the effective max message size based on 'thrift_rpc_max_message_size' flag. -int64_t ThriftRpcMaxMessageSize(); - -/// Return the default Thrift's TConfiguration based on given backend config flags. -std::shared_ptr<apache::thrift::TConfiguration> DefaultTConfiguration(); - -/// Set the max message size of a given TTransport with the effective value based on -/// 'thrift_rpc_max_message_size' flag. -void SetMaxMessageSize(apache::thrift::transport::TTransport* transport); +/// This is for trusted internal communication between cluster components such as +/// as statestore/catalog/HMS. Because messages are coming from trusted sources, a +/// higher limit does not introduce security issues. +int64_t ThriftInternalRpcMaxMessageSize(); + +/// Return the effective max message size based on 'thrift_external_rpc_max_message_size' +/// flag. This is for untrusted communication with clients. Using a lower limit protects +/// against malicious messages. +int64_t ThriftExternalRpcMaxMessageSize(); + +/// Return the default Thrift TConfiguration using the limit for trusted internal +/// communication. +std::shared_ptr<apache::thrift::TConfiguration> DefaultInternalTConfiguration(); + +/// Return the default Thrift TConfiguration using the limit for untrusted external +/// communication. +std::shared_ptr<apache::thrift::TConfiguration> DefaultExternalTConfiguration(); + +/// Set the max message size of a given TTransport to the specified value. +/// The value should be either ThriftInternalRpcMaxMessageSize() or +/// ThriftExternalRpcMaxMessageSize(). +void SetMaxMessageSize(apache::thrift::transport::TTransport* transport, + int64_t max_message_size); + +/// Verify that the max message size has been inherited properly. The source transport +/// (i.e. the one being wrapped) must already have the max message size set to either the +/// internal limit or the external limit. The destination transport (i.e. the wrapping +/// transport) must have that same limit. This DCHECKs if these conditions do not hold. +void VerifyMaxMessageSizeInheritance(apache::thrift::transport::TTransport* source, + apache::thrift::transport::TTransport* dest); /// Utility class to serialize thrift objects to a binary format. This object /// should be reused if possible to reuse the underlying memory. @@ -126,10 +148,14 @@ Status DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, bool compact, /// Deserialize msg bytes into c++ thrift msg using memory /// transport. TMemoryBuffer is not const-safe, although we use it in /// a const-safe way, so we have to explicitly cast away the const. + /// + /// This uses the external max message size limit, because this is often used + /// for small data structures that don't need a higher limit. It is used for a + /// few untrusted data structures like Parquet headers. std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport( new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len, apache::thrift::transport::TMemoryBuffer::MemoryPolicy::OBSERVE, - DefaultTConfiguration())); + DefaultExternalTConfiguration())); std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = CreateDeserializeProtocol(tmem_transport, compact); try { diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index ed48f86e4..98cb21cb2 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -303,6 +303,8 @@ Status StatestoreSubscriber::Start() { ThriftServerBuilder builder( "StatestoreSubscriber", processor, heartbeat_address_.port); + // Mark this as an internal service to use a more permissive Thrift max message size + builder.is_external_facing(false); if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; RETURN_IF_ERROR( diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index c171f6ca8..3114cc9f7 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -768,6 +768,8 @@ Status Statestore::Init(int32_t state_store_port) { new RpcEventHandler("statestore", metrics_)); processor->setEventHandler(event_handler); ThriftServerBuilder builder("StatestoreService", processor, state_store_port); + // Mark this as an internal service to use a more permissive Thrift max message size + builder.is_external_facing(false); if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; RETURN_IF_ERROR( @@ -1801,6 +1803,8 @@ Status Statestore::InitStatestoreHa( new RpcEventHandler("StatestoreHa", metrics_)); processor->setEventHandler(event_handler); ThriftServerBuilder builder("StatestoreHaService", processor, statestore_ha_port); + // Mark this as an internal service to use a more permissive Thrift max message size + builder.is_external_facing(false); if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; RETURN_IF_ERROR( diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp index 6009c6dc7..70c0bf764 100644 --- a/be/src/transport/THttpTransport.cpp +++ b/be/src/transport/THttpTransport.cpp @@ -35,7 +35,7 @@ const char* THttpTransport::CRLF = "\r\n"; const int THttpTransport::CRLF_LEN = 2; THttpTransport::THttpTransport(std::shared_ptr<TTransport> transport) - : TVirtualTransport(impala::DefaultTConfiguration()), + : TVirtualTransport(transport->getConfiguration()), transport_(transport), origin_(""), readHeaders_(true), diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp index d99bc160e..0529d622b 100644 --- a/be/src/transport/TSaslServerTransport.cpp +++ b/be/src/transport/TSaslServerTransport.cpp @@ -134,13 +134,11 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport( // to be the same so that the authentication state is identical for communication in // both directions. In order to do this, we share the same TTransport object for both // input and output set in TAcceptQueueServer::SetupConnection. - DCHECK_EQ( - impala::ThriftRpcMaxMessageSize(), trans->getConfiguration()->getMaxMessageSize()); std::shared_ptr<TBufferedTransport> ret_transport; std::shared_ptr<TTransport> wrapped( new TSaslServerTransport(serverDefinitionMap_, trans)); - DCHECK_EQ(impala::ThriftRpcMaxMessageSize(), - wrapped->getConfiguration()->getMaxMessageSize()); + // Verify the max message size is inherited properly + impala::VerifyMaxMessageSizeInheritance(trans.get(), wrapped.get()); // Set socket timeouts to prevent TSaslServerTransport->open from blocking the server // from accepting new connections if a read/write blocks during the handshake TSocket* socket = static_cast<TSocket*>(trans.get()); @@ -148,7 +146,8 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport( socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms); ret_transport.reset(new TBufferedTransport(wrapped, impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES, - impala::DefaultTConfiguration())); + wrapped->getConfiguration())); + impala::VerifyMaxMessageSizeInheritance(wrapped.get(), ret_transport.get()); ret_transport.get()->open(); // Reset socket timeout back to zero, so idle clients do not timeout socket->setRecvTimeout(0); diff --git a/be/src/transport/TSaslTransport.cpp b/be/src/transport/TSaslTransport.cpp index 6d79e16a4..11f0bc7a4 100644 --- a/be/src/transport/TSaslTransport.cpp +++ b/be/src/transport/TSaslTransport.cpp @@ -38,9 +38,9 @@ const int32_t DEFAULT_MEM_BUF_SIZE = 32 * 1024; namespace apache { namespace thrift { namespace transport { TSaslTransport::TSaslTransport(std::shared_ptr<TTransport> transport) - : TVirtualTransport(impala::DefaultTConfiguration()), + : TVirtualTransport(transport->getConfiguration()), transport_(transport), - memBuf_(new TMemoryBuffer(DEFAULT_MEM_BUF_SIZE, impala::DefaultTConfiguration())), + memBuf_(new TMemoryBuffer(DEFAULT_MEM_BUF_SIZE, transport->getConfiguration())), sasl_(NULL), shouldWrap_(false), isClient_(false) { @@ -48,9 +48,9 @@ namespace apache { namespace thrift { namespace transport { TSaslTransport::TSaslTransport(std::shared_ptr<sasl::TSasl> saslClient, std::shared_ptr<TTransport> transport) - : TVirtualTransport(impala::DefaultTConfiguration()), + : TVirtualTransport(transport->getConfiguration()), transport_(transport), - memBuf_(new TMemoryBuffer(impala::DefaultTConfiguration())), + memBuf_(new TMemoryBuffer(transport->getConfiguration())), sasl_(saslClient), shouldWrap_(false), isClient_(true) { diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 55b8b04f4..7e6127a90 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -22,6 +22,7 @@ #include "gutil/strings/substitute.h" #include "kudu/util/flag_tags.h" #include "rpc/jni-thrift-util.h" +#include "rpc/thrift-util.h" #include "util/backend-gflag-util.h" #include "util/logging-support.h" #include "util/os-util.h" @@ -105,7 +106,6 @@ DECLARE_bool(enable_sync_to_latest_event_on_ddls); DECLARE_bool(pull_table_types_and_comments); DECLARE_bool(enable_reload_events); DECLARE_string(geospatial_library); -DECLARE_int64(thrift_rpc_max_message_size); DECLARE_string(file_metadata_reload_properties); DECLARE_string(java_weigher); DECLARE_int32(iceberg_reload_new_files_threshold); @@ -455,7 +455,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_skip_resource_checking_on_last_executor_group_set( FLAGS_skip_resource_checking_on_last_executor_group_set); cfg.__set_file_metadata_reload_properties(FLAGS_file_metadata_reload_properties); - cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size); + cfg.__set_thrift_rpc_max_message_size(ThriftInternalRpcMaxMessageSize()); cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor); cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm"); cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold); diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index 9fe978308..ce8f4a619 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -75,7 +75,7 @@ bool DeserializeThriftMsg( // Deserialize msg bytes into c++ thrift msg using memory transport. std::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len, apache::thrift::transport::TMemoryBuffer::MemoryPolicy::OBSERVE, - impala::DefaultTConfiguration())); + impala::DefaultExternalTConfiguration())); std::shared_ptr<TProtocol> tproto = CreateDeserializeProtocol(tmem_transport, compact); try { diff --git a/testdata/scale_test_metadata/README.md b/testdata/scale_test_metadata/README.md index 8e3829e77..4989d59f6 100644 --- a/testdata/scale_test_metadata/README.md +++ b/testdata/scale_test_metadata/README.md @@ -86,7 +86,7 @@ test that coordinator-to-catalogd RPC works well. The steps are follow: ``` 7. Run the same EXPLAIN query again. This should run successfully, because the default - `thrift_rpc_max_message_size` is 1GB. + `thrift_rpc_max_message_size` is 64GB (see IMPALA-13020). ``` impala-shell.sh -q 'EXPLAIN SELECT id FROM 1k_col_tbl' ``` diff --git a/testdata/scale_test_metadata/create-wide-table.sql b/testdata/scale_test_metadata/create-wide-table.sql index 87a1e673d..5edbce0f5 100644 --- a/testdata/scale_test_metadata/create-wide-table.sql +++ b/testdata/scale_test_metadata/create-wide-table.sql @@ -1002,9 +1002,9 @@ very_very_very_very_very_long_string_column_name998 string, very_very_very_very_very_long_string_column_name999 string, very_very_very_very_very_long_string_column_name1000 string ) partitioned by ( -p0 string, p1 string, -p2 string +p2 string, +p3 string ) row format delimited fields terminated by '|' stored as textfile
