This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new b1a985be5 IMPALA-13680: Avoid flush() when closing TSSLSocket
b1a985be5 is described below
commit b1a985be5eb49db6f23912a1439eeb59d74a278e
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Tue Jan 21 10:36:12 2025 +0100
IMPALA-13680: Avoid flush() when closing TSSLSocket
Closing the transports could hang in TAcceptQueueServer if there was
an error during SSL handshake. As the TSSLSocket is wrapped in
TBufferedTransport and TBufferedTransport::close() calls flush(),
TSSLSocket::flush() was also called that led to trying again the
handshake in an unclean state. This led to hanging indefinitely with
OpenSSL 3.2. Another potential error is that if flush throws an
exception then the underlying TTransport's close() wont' be called.
Ideally this would be solved in Thrift (THRIFT-5846). As quick
fix this change adds a subclass for TBufferedTransport that doesn't
call flush(). This is safe to do as generated TProcessor
subclasses call flush() every time when the client/server sends
a message.
Testing:
- the issue was caught by thrift-server-test/KerberosOnAndOff
and TestClientSsl::test_ssl hanging till killed
Change-Id: I4879a1567f7691711d73287269bf87f2946e75d2
Reviewed-on: http://gerrit.cloudera.org:8080/22368
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
be/src/rpc/auth-provider.h | 3 ++-
be/src/rpc/authentication.cc | 8 ++++----
be/src/rpc/thrift-client.h | 2 +-
be/src/rpc/thrift-server.h | 20 +++++++++++++++++++-
be/src/transport/TSaslServerTransport.cpp | 4 ++--
5 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 1f2ab776a..9f1a1420a 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -42,7 +42,8 @@ class AuthProvider {
/// Creates a new Thrift transport factory in the out parameter that performs
/// authorisation per this provider's protocol. The type of the transport
returned is
/// determined by 'underlying_transport_type' and there may be multiple
levels of
- /// wrapped transports, eg. a TBufferedTransport around a
TSaslServerTransport.
+ /// wrapped transports, eg. a ThriftServer::BufferedTransport around a
+ /// TSaslServerTransport.
virtual Status GetServerTransportFactory(
ThriftServer::TransportType underlying_transport_type,
const std::string& server_name, MetricGroup* metrics,
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 020a41356..8b022b7fd 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -1493,8 +1493,8 @@ void SecureAuthProvider::SetupConnectionContext(
TSocket* socket = nullptr;
switch (underlying_transport_type) {
case ThriftServer::BINARY: {
- TBufferedTransport* buffered_transport =
- down_cast<TBufferedTransport*>(input_transport);
+ ThriftServer::BufferedTransport* buffered_transport =
+ down_cast<ThriftServer::BufferedTransport*>(input_transport);
TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>(
buffered_transport->getUnderlyingTransport().get());
socket =
down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
@@ -1605,8 +1605,8 @@ void NoAuthProvider::SetupConnectionContext(
TSocket* socket = nullptr;
switch (underlying_transport_type) {
case ThriftServer::BINARY: {
- TBufferedTransport* buffered_transport =
- down_cast<TBufferedTransport*>(input_transport);
+ ThriftServer::BufferedTransport* buffered_transport =
+ down_cast<ThriftServer::BufferedTransport*>(input_transport);
socket =
down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get());
break;
}
diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h
index a88fe300a..7cb0dbe14 100644
--- a/be/src/rpc/thrift-client.h
+++ b/be/src/rpc/thrift-client.h
@@ -154,7 +154,7 @@ ThriftClient<InterfaceType>::ThriftClient(const
std::string& ipaddress, int port
}
// transport_ is created by wrapping the socket_ in the TTransport provided
by the
- // auth_provider_ and then a TBufferedTransport (IMPALA-1928).
+ // auth_provider_ and then a ThriftServer::BufferedTransport (IMPALA-1928).
transport_ = socket_;
init_status_ = auth_provider_->WrapClientTransport(address_.hostname,
transport_,
service_name, &transport_);
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 75488f78b..820526ca1 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -59,6 +59,24 @@ class AuthProvider;
/// TODO: shutdown is buggy (which only harms tests)
class ThriftServer {
public:
+
+ /// Override TBufferedTransport::close() to avoid calling flush() which is
not safe
+ /// in TSSLSocket if it is in error state. See IMPALA-13680 / THRIFT-5846
for details.
+ class BufferedTransport :
+ public apache::thrift::transport::TBufferedTransport {
+
+ public:
+
BufferedTransport(std::shared_ptr<apache::thrift::transport::TTransport>&
transport,
+ uint32_t sz, std::shared_ptr<apache::thrift::TConfiguration>&&
config)
+ : apache::thrift::transport::TBufferedTransport(transport, sz, config) {}
+
+ // base implementation:
+ //
https://github.com/apache/thrift/blob/d078721e44fea7713832ae5d0f5d9ca67317f19e/lib/cpp/src/thrift/transport/TBufferTransports.h#L367
+ virtual void close() override {
+ transport_->close();
+ }
+ };
+
/// Transport factory that wraps transports in a buffered transport with a
customisable
/// buffer-size and optionally in another transport from a provided factory.
A larger
/// buffer is usually more efficient, as it allows the underlying transports
to perform
@@ -81,7 +99,7 @@ class ThriftServer {
VerifyMaxMessageSizeInheritance(trans.get(), wrapped.get());
std::shared_ptr<apache::thrift::transport::TTransport> buffered_wrapped =
std::shared_ptr<apache::thrift::transport::TTransport>(
- new apache::thrift::transport::TBufferedTransport(
+ new BufferedTransport(
wrapped, buffer_size_, wrapped->getConfiguration()));
VerifyMaxMessageSizeInheritance(wrapped.get(), buffered_wrapped.get());
return buffered_wrapped;
diff --git a/be/src/transport/TSaslServerTransport.cpp
b/be/src/transport/TSaslServerTransport.cpp
index 3018ab85b..082929440 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -116,7 +116,7 @@ std::shared_ptr<TTransport>
TSaslServerTransport::Factory::getTransport(
// to be the same so that the authentication state is identical for
communication in
// both directions. In order to do this, we share the same TTransport object
for both
// input and output set in TAcceptQueueServer::SetupConnection.
- std::shared_ptr<TBufferedTransport> ret_transport;
+ std::shared_ptr<impala::ThriftServer::BufferedTransport> ret_transport;
std::shared_ptr<TTransport> wrapped(
new TSaslServerTransport(serverDefinitionMap_, trans));
// Verify the max message size is inherited properly
@@ -126,7 +126,7 @@ std::shared_ptr<TTransport>
TSaslServerTransport::Factory::getTransport(
TSocket* socket = static_cast<TSocket*>(trans.get());
socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
- ret_transport.reset(new TBufferedTransport(wrapped,
+ ret_transport.reset(new impala::ThriftServer::BufferedTransport(wrapped,
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
wrapped->getConfiguration()));
impala::VerifyMaxMessageSizeInheritance(wrapped.get(), ret_transport.get());