This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a9786d341 IMPALA-11669: (addendum) Set TConfiguration in TMemoryBuffer
a9786d341 is described below
commit a9786d34192552f2d725f2a107ccc19db3fb3e80
Author: Riza Suminto <[email protected]>
AuthorDate: Sat Oct 29 22:50:23 2022 -0700
IMPALA-11669: (addendum) Set TConfiguration in TMemoryBuffer
This patch adds DefaultTConfiguration into TMemoryBuffer used within
DeserializeThriftMsg, ThriftSerializer, and TSaslTransport. This patch
also makes some adjustment, including:
- Refactor AssignDefaultTConfiguration to SetMaxMessageSize.
- Supply DefaultTConfiguration into the constructor of THttpTransport
and TSaslTransport.
- Supply DefaultTConfiguration through the constructor of
TBufferedTransport.
Testing:
- Add DCHECK_EQ in places where we expect that it should pick up
DefaultTConfiguration.
- Add SerDeBuffer100MB test.
- Lower thrift_rpc_max_message_size to 128KB for all tests in
thrift-server-test to avoid race condition.
- Pass core tests.
- Manually run and pass test scenario described in
testdata/scale_test_metadata/ both in SSL and no SSL setup.
Change-Id: I37a8e71c64a09ec8aeccb96c6ee59ca82c0b37cb
Reviewed-on: http://gerrit.cloudera.org:8080/19179
Reviewed-by: Wenzhe Zhou <[email protected]>
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/rpc/TAcceptQueueServer.cpp | 3 +-
be/src/rpc/authentication.cc | 2 +-
be/src/rpc/thrift-client.cc | 2 +-
be/src/rpc/thrift-server-test.cc | 2 +-
be/src/rpc/thrift-server.h | 8 +++--
be/src/rpc/thrift-util-test.cc | 53 +++++++++++++++++++++++++++++++
be/src/rpc/thrift-util.cc | 21 ++++++------
be/src/rpc/thrift-util.h | 30 ++++++++++-------
be/src/transport/THttpTransport.cpp | 4 ++-
be/src/transport/TSaslServerTransport.cpp | 7 +++-
be/src/transport/TSaslTransport.cpp | 11 ++++---
be/src/util/parquet-reader.cc | 5 ++-
testdata/scale_test_metadata/README.md | 18 ++++++++---
13 files changed, 126 insertions(+), 40 deletions(-)
diff --git a/be/src/rpc/TAcceptQueueServer.cpp
b/be/src/rpc/TAcceptQueueServer.cpp
index 42fef92da..516811b22 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -223,6 +223,7 @@ void
TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
if (metrics_enabled_) queue_size_metric_->Increment(-1);
shared_ptr<TTransport> io_transport;
shared_ptr<TTransport> client = entry->client_;
+ SetMaxMessageSize(client.get());
const string& socket_info =
reinterpret_cast<TSocket*>(client.get())->getSocketInfo();
VLOG(2) << Substitute("TAcceptQueueServer: $0 started connection setup for
client $1",
name_, socket_info);
@@ -244,7 +245,7 @@ void
TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
// TSaslServerTransport::Factory is not required anymore.
DCHECK(inputTransportFactory_ == outputTransportFactory_);
io_transport = inputTransportFactory_->getTransport(client);
- AssignDefaultTConfiguration(io_transport.get());
+ SetMaxMessageSize(io_transport.get());
shared_ptr<TProtocol> inputProtocol =
inputProtocolFactory_->getProtocol(io_transport);
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 014f3c5b7..5001f7184 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -1228,7 +1228,7 @@ Status SecureAuthProvider::WrapClientTransport(const
string& hostname,
return Status(e.what());
}
wrapped_transport->reset(new TSaslClientTransport(sasl_client,
raw_transport));
- AssignDefaultTConfiguration(wrapped_transport->get());
+ SetMaxMessageSize(wrapped_transport->get());
// This function is called immediately prior to sasl_client_start(), and so
// can be used to log an "I'm beginning authentication for this principal"
diff --git a/be/src/rpc/thrift-client.cc b/be/src/rpc/thrift-client.cc
index 89bc3f460..bba4356c7 100644
--- a/be/src/rpc/thrift-client.cc
+++ b/be/src/rpc/thrift-client.cc
@@ -139,7 +139,7 @@ Status ThriftClientImpl::CreateSocket() {
return Status(TErrorCode::SSL_SOCKET_CREATION_FAILED, e.what());
}
}
- if (socket_ != nullptr) AssignDefaultTConfiguration(socket_.get());
+ if (socket_ != nullptr) SetMaxMessageSize(socket_.get());
return Status::OK();
}
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 8b95d4841..50edea47e 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -149,7 +149,6 @@ TEST(ThriftTestBase, Connectivity) {
}
void TestMaxMessageSize(std::string subscriber_id, bool expect_throw) {
- auto s = ScopedFlagSetter<int>::Make(&FLAGS_thrift_rpc_max_message_size, 128
* 1024);
int port = GetServerPort();
ThriftServer* server;
EXPECT_OK(ThriftServerBuilder("DummyStatestore", MakeProcessor(),
port).Build(&server));
@@ -749,6 +748,7 @@ TEST(NoPasswordPemFile, BadServerCertificate) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+ FLAGS_thrift_rpc_max_message_size = 128 * 1024;
int port = impala::FindUnusedEphemeralPort();
std::unique_ptr<impala::MiniKdcWrapper> kdc;
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index b6696b55b..f7ed7c10a 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -75,13 +75,15 @@ class ThriftServer {
virtual std::shared_ptr<apache::thrift::transport::TTransport>
getTransport(
std::shared_ptr<apache::thrift::transport::TTransport> trans) {
+ DCHECK_EQ(ThriftRpcMaxMessageSize(),
+ trans->getConfiguration()->getMaxMessageSize());
std::shared_ptr<apache::thrift::transport::TTransport> wrapped =
wrapped_factory_->getTransport(trans);
- AssignDefaultTConfiguration(wrapped.get());
+ SetMaxMessageSize(wrapped.get());
std::shared_ptr<apache::thrift::transport::TTransport> buffered_wrapped =
std::shared_ptr<apache::thrift::transport::TTransport>(
- new apache::thrift::transport::TBufferedTransport(wrapped,
buffer_size_));
- AssignDefaultTConfiguration(buffered_wrapped.get());
+ new apache::thrift::transport::TBufferedTransport(
+ wrapped, buffer_size_, DefaultTConfiguration()));
return buffered_wrapped;
}
private:
diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc
index a4dd78b5c..4d1a580b6 100644
--- a/be/src/rpc/thrift-util-test.cc
+++ b/be/src/rpc/thrift-util-test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <stdio.h>
+#include <sstream>
#include "rpc/thrift-util.h"
#include "testutil/gtest-util.h"
@@ -23,6 +24,8 @@
#include "util/network-util.h"
#include "gen-cpp/RuntimeProfile_types.h"
+#include "gen-cpp/CatalogObjects_types.h"
+#include "gen-cpp/CatalogService_types.h"
#include "common/names.h"
@@ -76,6 +79,56 @@ TEST(ThriftUtil, SimpleSerializeDeserialize) {
}
}
+TEST(ThriftUtil, SerDeBuffer100MB) {
+ // Test ThriftSerializer and DeserializeThriftMsg to handle a little over
100MB
+ // buffer serialization and deserialization, mimicking content of
+ // TGetPartialCatalogObjectResponse.table_info.
+ std::ostringstream ss;
+ ss << "/p2=";
+ ss << std::setw(250) << std::setfill('0') << 1;
+ ss << "/p3=";
+ ss << std::setw(250) << std::setfill('0') << 1;
+ ss << "/";
+ std::string suffix = ss.str();
+
+ // Loop over compact and binary protocols.
+ for (int i = 0; i < 2; ++i) {
+ bool compact = (i == 0);
+ ThriftSerializer serializer(compact);
+
+ TPartialTableInfo table_info;
+ vector<TPartialPartitionInfo> partitions;
+ for (int j = 1; j < 150000; ++j) {
+ std::ostringstream p1;
+ p1 << "/test-warehouse/1k_col_tbl/p1=";
+ p1 << std::setw(250) << std::setfill('0') << j;
+ p1 << suffix;
+
+ THdfsPartitionLocation part_location;
+ part_location.__set_suffix(p1.str());
+
+ TPartialPartitionInfo part_info;
+ part_info.__set_id(j);
+ part_info.__set_location(part_location);
+ partitions.push_back(part_info);
+ }
+ table_info.__set_partitions(partitions);
+
+ uint8_t* buffer;
+ uint32_t len;
+ EXPECT_OK(serializer.SerializeToBuffer(&table_info, &len, &buffer));
+ DCHECK_GT(len, 100 * 1024 * 1024);
+
+ TPartialTableInfo deserialized_table_info;
+ EXPECT_OK(DeserializeThriftMsg(buffer, &len, compact,
&deserialized_table_info));
+ EXPECT_EQ(table_info.partitions.size(),
deserialized_table_info.partitions.size());
+ for (int j = 0; j < table_info.partitions.size(); ++j) {
+ EXPECT_EQ(table_info.partitions[j].location.suffix,
+ deserialized_table_info.partitions[j].location.suffix);
+ }
+ }
+}
+
TEST(ThriftUtil, TNetworkAddressComparator) {
EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
MakeNetworkAddress("zzzz", 500)));
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 271f3e84a..502b7c22b 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -83,8 +83,8 @@ static_assert(PACKAGE_VERSION[6] == '\0',
NEW_THRIFT_VERSION_MSG);
namespace impala {
-ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size) :
- mem_buffer_(new TMemoryBuffer(initial_buffer_size)) {
+ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size)
+ : mem_buffer_(new TMemoryBuffer(initial_buffer_size,
DefaultTConfiguration())) {
if (compact) {
TCompactProtocolFactoryT<TMemoryBuffer> factory;
protocol_ = factory.getProtocol(mem_buffer_);
@@ -285,18 +285,19 @@ bool IsConnResetTException(const TTransportException& e) {
strstr(e.what(), "SSL_read: Connection reset by peer") !=
nullptr);
}
+int ThriftRpcMaxMessageSize() {
+ return FLAGS_thrift_rpc_max_message_size <= 0 ?
ThriftDefaultMaxMessageSize() :
+
FLAGS_thrift_rpc_max_message_size;
+}
+
shared_ptr<TConfiguration> DefaultTConfiguration() {
- return make_shared<TConfiguration>(FLAGS_thrift_rpc_max_message_size <= 0 ?
- ThriftDefaultMaxMessageSize() :
- FLAGS_thrift_rpc_max_message_size);
+ return make_shared<TConfiguration>(ThriftRpcMaxMessageSize());
}
-void AssignDefaultTConfiguration(TTransport* transport) {
+void SetMaxMessageSize(TTransport* transport) {
// TODO: Find way to assign TConfiguration through TTransportFactory instead.
- transport->setConfiguration(DefaultTConfiguration());
+ transport->getConfiguration()->setMaxMessageSize(ThriftRpcMaxMessageSize());
transport->updateKnownMessageSize(-1);
- EXPECT_NO_THROW(transport->checkReadBytesAvailable(
- FLAGS_thrift_rpc_max_message_size <= 0 ? ThriftDefaultMaxMessageSize() :
-
FLAGS_thrift_rpc_max_message_size));
+
EXPECT_NO_THROW(transport->checkReadBytesAvailable(ThriftRpcMaxMessageSize()));
}
}
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 9cce6eb94..1c9706ed2 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -38,6 +38,21 @@ class TColumnValue;
class TNetworkAddress;
class ThriftServer;
+/// Default max message size from Thrift library.
+inline int ThriftDefaultMaxMessageSize() {
+ return apache::thrift::TConfiguration::DEFAULT_MAX_MESSAGE_SIZE;
+}
+
+/// Return the effective max message size based on
'thrift_rpc_max_message_size' flag.
+int ThriftRpcMaxMessageSize();
+
+/// Return the default Thrift's TConfiguration based on given backend config
flags.
+std::shared_ptr<apache::thrift::TConfiguration> DefaultTConfiguration();
+
+/// Set the max message size of a given TTransport with the effective value
based on
+/// 'thrift_rpc_max_message_size' flag.
+void SetMaxMessageSize(apache::thrift::transport::TTransport* transport);
+
/// Utility class to serialize thrift objects to a binary format. This object
/// should be reused if possible to reuse the underlying memory.
/// Note: thrift will encode NULLs into the serialized buffer so it is not
valid
@@ -110,7 +125,9 @@ Status DeserializeThriftMsg(const uint8_t* buf, uint32_t*
len, bool compact,
/// transport. TMemoryBuffer is not const-safe, although we use it in
/// a const-safe way, so we have to explicitly cast away the const.
std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
- new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf),
*len));
+ new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf),
*len,
+ apache::thrift::transport::TMemoryBuffer::MemoryPolicy::OBSERVE,
+ DefaultTConfiguration()));
std::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
CreateDeserializeProtocol(tmem_transport, compact);
try {
@@ -168,17 +185,6 @@ bool IsPeekTimeoutTException(const
apache::thrift::transport::TTransportExceptio
/// Returns true if the exception indicates the other end of the TCP socket
was closed.
bool IsConnResetTException(const
apache::thrift::transport::TTransportException& e);
-
-inline int ThriftDefaultMaxMessageSize() {
- return apache::thrift::TConfiguration::DEFAULT_MAX_MESSAGE_SIZE;
-}
-
-/// Return the default Thrift's TConfiguration based on given backend config
flags.
-std::shared_ptr<apache::thrift::TConfiguration> DefaultTConfiguration();
-
-/// Assign given TTransport with the default TConfiguration and update the
known message
-/// size accordingly.
-void AssignDefaultTConfiguration(apache::thrift::transport::TTransport*
transport);
}
#endif
diff --git a/be/src/transport/THttpTransport.cpp
b/be/src/transport/THttpTransport.cpp
index d4f5bbf0d..6009c6dc7 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -19,6 +19,7 @@
#include <sstream>
+#include "rpc/thrift-util.h"
#include "transport/THttpTransport.h"
#include "common/logging.h"
@@ -34,7 +35,8 @@ const char* THttpTransport::CRLF = "\r\n";
const int THttpTransport::CRLF_LEN = 2;
THttpTransport::THttpTransport(std::shared_ptr<TTransport> transport)
- : transport_(transport),
+ : TVirtualTransport(impala::DefaultTConfiguration()),
+ transport_(transport),
origin_(""),
readHeaders_(true),
readWholeBodyForAuth_(false),
diff --git a/be/src/transport/TSaslServerTransport.cpp
b/be/src/transport/TSaslServerTransport.cpp
index 570198fe3..d99bc160e 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -134,16 +134,21 @@ std::shared_ptr<TTransport>
TSaslServerTransport::Factory::getTransport(
// to be the same so that the authentication state is identical for
communication in
// both directions. In order to do this, we share the same TTransport object
for both
// input and output set in TAcceptQueueServer::SetupConnection.
+ DCHECK_EQ(
+ impala::ThriftRpcMaxMessageSize(),
trans->getConfiguration()->getMaxMessageSize());
std::shared_ptr<TBufferedTransport> ret_transport;
std::shared_ptr<TTransport> wrapped(
new TSaslServerTransport(serverDefinitionMap_, trans));
+ DCHECK_EQ(impala::ThriftRpcMaxMessageSize(),
+ wrapped->getConfiguration()->getMaxMessageSize());
// Set socket timeouts to prevent TSaslServerTransport->open from blocking
the server
// from accepting new connections if a read/write blocks during the handshake
TSocket* socket = static_cast<TSocket*>(trans.get());
socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
ret_transport.reset(new TBufferedTransport(wrapped,
-
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
+
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
+ impala::DefaultTConfiguration()));
ret_transport.get()->open();
// Reset socket timeout back to zero, so idle clients do not timeout
socket->setRecvTimeout(0);
diff --git a/be/src/transport/TSaslTransport.cpp
b/be/src/transport/TSaslTransport.cpp
index 30c5dad0f..7778a502b 100644
--- a/be/src/transport/TSaslTransport.cpp
+++ b/be/src/transport/TSaslTransport.cpp
@@ -27,6 +27,7 @@
#include <boost/scoped_ptr.hpp>
#include <thrift/transport/TBufferTransports.h>
+#include "rpc/thrift-util.h"
#include "transport/TSaslTransport.h"
#include "common/names.h"
@@ -37,8 +38,9 @@ const int32_t DEFAULT_MEM_BUF_SIZE = 32 * 1024;
namespace apache { namespace thrift { namespace transport {
TSaslTransport::TSaslTransport(std::shared_ptr<TTransport> transport)
- : transport_(transport),
- memBuf_(new TMemoryBuffer(DEFAULT_MEM_BUF_SIZE)),
+ : TVirtualTransport(impala::DefaultTConfiguration()),
+ transport_(transport),
+ memBuf_(new TMemoryBuffer(DEFAULT_MEM_BUF_SIZE,
impala::DefaultTConfiguration())),
sasl_(NULL),
shouldWrap_(false),
isClient_(false) {
@@ -46,8 +48,9 @@ namespace apache { namespace thrift { namespace transport {
TSaslTransport::TSaslTransport(std::shared_ptr<sasl::TSasl> saslClient,
std::shared_ptr<TTransport> transport)
- : transport_(transport),
- memBuf_(new TMemoryBuffer()),
+ : TVirtualTransport(impala::DefaultTConfiguration()),
+ transport_(transport),
+ memBuf_(new TMemoryBuffer(impala::DefaultTConfiguration())),
sasl_(saslClient),
shouldWrap_(false),
isClient_(true) {
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index 3bdd488e2..9fe978308 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -36,6 +36,7 @@
#pragma clang diagnostic pop
#include "exec/parquet/parquet-common.h"
+#include "rpc/thrift-util.h"
#include "runtime/mem-pool.h"
#include "util/codec.h"
#include "util/rle-encoding.h"
@@ -72,7 +73,9 @@ template <class T>
bool DeserializeThriftMsg(
uint8_t* buf, uint32_t* len, bool compact, T* deserialized_msg) {
// Deserialize msg bytes into c++ thrift msg using memory transport.
- std::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len));
+ std::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len,
+ apache::thrift::transport::TMemoryBuffer::MemoryPolicy::OBSERVE,
+ impala::DefaultTConfiguration()));
std::shared_ptr<TProtocol> tproto =
CreateDeserializeProtocol(tmem_transport, compact);
try {
diff --git a/testdata/scale_test_metadata/README.md
b/testdata/scale_test_metadata/README.md
index 70eea8c3a..8e3829e77 100644
--- a/testdata/scale_test_metadata/README.md
+++ b/testdata/scale_test_metadata/README.md
@@ -48,8 +48,8 @@ test that coordinator-to-catalogd RPC works well. The steps
are follow:
--catalogd_args="--catalog_topic_mode=minimal"
# Restart catalogd with additional args and jvm args
- bin/start-impala-cluster.py -s 1 --restart_catalogd_only --jvm_args=-Xmx12g
\
- --catalogd_args=" --catalog_topic_mode=minimal
--thrift_rpc_max_message_size=0 --warn_catalog_response_size_mb=1"
+ ./bin/start-impala-cluster.py -s 1 --restart_catalogd_only
--jvm_args=-Xmx12g \
+ --catalogd_args=" --catalog_topic_mode=minimal
--thrift_rpc_max_message_size=0 --warn_catalog_response_size_mb=1"
```
5. Run the following EXPLAIN query with impala-shell.
@@ -81,8 +81,8 @@ test that coordinator-to-catalogd RPC works well. The steps
are follow:
--catalogd_args="--catalog_topic_mode=minimal"
# Restart catalogd with additional args and jvm args
- bin/start-impala-cluster.py -s 1 --restart_catalogd_only --jvm_args=-Xmx12g
\
- --catalogd_args="--catalog_topic_mode=minimal
--warn_catalog_response_size_mb=1"
+ ./bin/start-impala-cluster.py -s 1 --restart_catalogd_only
--jvm_args=-Xmx12g \
+ --catalogd_args="--catalog_topic_mode=minimal
--warn_catalog_response_size_mb=1"
```
7. Run the same EXPLAIN query again. This should run successfully, because the
default
@@ -90,3 +90,13 @@ test that coordinator-to-catalogd RPC works well. The steps
are follow:
```
impala-shell.sh -q 'EXPLAIN SELECT id FROM 1k_col_tbl'
```
+
+To exercise Impala with SSL, add the following args in each daemon start up
args.
+```
+--ssl_client_ca_certificate=$IMPALA_HOME/be/src/testutil/server-cert.pem
--ssl_server_certificate=$IMPALA_HOME/be/src/testutil/server-cert.pem
--ssl_private_key=$IMPALA_HOME/be/src/testutil/server-key.pem
--hostname=localhost
+```
+
+And use the following impala-shell command.
+```
+impala-shell.sh --ssl --ca_cert=$IMPALA_HOME/be/src/testutil/server-cert.pem
-q 'EXPLAIN SELECT id FROM 1k_col_tbl'
+```