This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 13df8239d82a61afc3196295a7878ca2ffe91873
Author: Joe McDonnell <[email protected]>
AuthorDate: Sun Apr 28 12:44:50 2024 -0700

    IMPALA-13020 (part 1): Change thrift_rpc_max_message_size to int64_t
    
    Thrift 0.16.0 introduced a max message size to protect
    receivers against a malicious message allocating large
    amounts of memory. That limit is a 32-bit signed integer,
    so the max value is 2GB. Impala introduced the
    thrift_rpc_max_message_size startup option to set that
    for Impala's thrift servers.
    
    There are times when Impala wants to send a message that
    is larger than 2GB. In particular, the catalog-update
    topic for the statestore can exceed 2GBs when there is
    a lot of metadata loaded using the old v1 catalog. When
    there is a 2GB max message size, the statestore can create
    and send a >2GB message, but the impalads will reject
    it. This can lead to impalads having stale metadata.
    
    This switches to a patched Thrift that uses an int64_t
    for the max message size for C++ code. It does not modify
    the limit.
    
    The MaxMessageSize error was being swallowed in TAcceptQueueServer.cpp,
    so this fixes that location to always print MaxMessageSize
    exceptions.
    
    This is only patching the Thrift C++ library. It does not
    patch the Thrift Java library. There are a few reasons for
    that:
     - This specific issue involves C++ to C++ communication and
       will be solved by patching the C++ library.
     - C++ is easy to patch as it is built via the native-toolchain.
       There is no corresponding mechanism for patching our Java
       dependencies (though one could be developed).
     - Java modifications have implications for other dependencies
       like Hive which use Thrift to communicate with HMS.
    For the Java code that uses max message size, this converts
    the 64-bit value to 32-bit value by capping the value at
    Integer.MAX_VALUE.
    
    Testing:
     - Added enough tables to produce a >2GB catalog-topic and
       restarted an impalad with a higher limit specific. Without
       the patch, the catalog-topic update would be rejected by the
       impalad. With the patch, it succeeds.
    
    Change-Id: I681b1849cc565dcb25de8c070c18776ce69cbb87
    Reviewed-on: http://gerrit.cloudera.org:8080/21367
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Joe McDonnell <[email protected]>
---
 be/src/common/init.cc                                       |  2 +-
 be/src/rpc/TAcceptQueueServer.cpp                           | 13 ++++++++++---
 be/src/rpc/thrift-server-test.cc                            |  2 +-
 be/src/rpc/thrift-util.cc                                   |  4 ++--
 be/src/rpc/thrift-util.h                                    |  4 ++--
 be/src/util/backend-gflag-util.cc                           |  2 +-
 bin/impala-config.sh                                        | 12 ++++++------
 common/thrift/BackendGflags.thrift                          |  2 +-
 .../main/java/org/apache/impala/service/BackendConfig.java  |  5 ++++-
 9 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 9ac9d9784..dca56aa2c 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -83,7 +83,7 @@ DECLARE_string(re2_mem_limit);
 DECLARE_string(reserved_words_version);
 DECLARE_bool(symbolize_stacktrace);
 DECLARE_string(debug_actions);
-DECLARE_int32(thrift_rpc_max_message_size);
+DECLARE_int64(thrift_rpc_max_message_size);
 
 DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in 
milliseconds "
     "between memory maintenance iterations");
diff --git a/be/src/rpc/TAcceptQueueServer.cpp 
b/be/src/rpc/TAcceptQueueServer.cpp
index 516811b22..ada6fd7ff 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -43,8 +43,6 @@ DEFINE_int32(accepted_cnxn_setup_thread_pool_size, 2,
     "post-accept, pre-setup connection queue in each thrift server set up to 
service "
     "Impala internal and external connections.");
 
-DECLARE_int32(thrift_rpc_max_message_size);
-
 namespace apache {
 namespace thrift {
 namespace server {
@@ -89,9 +87,18 @@ class TAcceptQueueServer::Task : public Runnable {
         }
       }
     } catch (const TTransportException& ttx) {
-      if (ttx.getType() != TTransportException::END_OF_FILE) {
+      // IMPALA-13020: Thrift throws an END_OF_FILE exception when it hits the
+      // max message size. That is always interesting to us, so we specifically
+      // detect "MaxMessageSize" and print it along with advice on how to 
address it.
+      bool hit_max_message_size =
+          std::string(ttx.what()).find("MaxMessageSize") != std::string::npos;
+      if (ttx.getType() != TTransportException::END_OF_FILE || 
hit_max_message_size) {
         string errStr = string("TAcceptQueueServer client died: ") + 
ttx.what();
         GlobalOutput(errStr.c_str());
+        if (hit_max_message_size) {
+          GlobalOutput("MaxMessageSize errors can be addressed by increasing "
+              "thrift_rpc_max_message_size on the receiving nodes.");
+        }
       }
     } catch (const std::exception& x) {
       GlobalOutput.printf(
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 425155c94..ab579b094 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -47,7 +47,7 @@ DECLARE_int32(state_store_port);
 
 DECLARE_int32(beeswax_port);
 
-DECLARE_int32(thrift_rpc_max_message_size);
+DECLARE_int64(thrift_rpc_max_message_size);
 
 static string IMPALA_HOME(getenv("IMPALA_HOME"));
 static const string& SERVER_CERT =
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 973544cca..5b3ae17cd 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -58,7 +58,7 @@
 
 #include "common/names.h"
 
-DEFINE_int32(thrift_rpc_max_message_size, std::numeric_limits<int32_t>::max(),
+DEFINE_int64(thrift_rpc_max_message_size, std::numeric_limits<int32_t>::max(),
     "The maximum size of a message for any RPC that the server will accept. "
     "Default to the upper limit of 2147483647 bytes (~2GB). "
     "Setting 0 or negative value will use the default defined in Thrift.");
@@ -280,7 +280,7 @@ bool IsConnResetTException(const TTransportException& e) {
              strstr(e.what(), "SSL_read: Connection reset by peer") != 
nullptr);
 }
 
-int ThriftRpcMaxMessageSize() {
+int64_t ThriftRpcMaxMessageSize() {
   return FLAGS_thrift_rpc_max_message_size <= 0 ? 
ThriftDefaultMaxMessageSize() :
                                                   
FLAGS_thrift_rpc_max_message_size;
 }
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 0945c1278..e3a7a6948 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -39,12 +39,12 @@ class TNetworkAddress;
 class ThriftServer;
 
 /// Default max message size from Thrift library.
-inline int ThriftDefaultMaxMessageSize() {
+inline int64_t ThriftDefaultMaxMessageSize() {
   return apache::thrift::TConfiguration::DEFAULT_MAX_MESSAGE_SIZE;
 }
 
 /// Return the effective max message size based on 
'thrift_rpc_max_message_size' flag.
-int ThriftRpcMaxMessageSize();
+int64_t ThriftRpcMaxMessageSize();
 
 /// Return the default Thrift's TConfiguration based on given backend config 
flags.
 std::shared_ptr<apache::thrift::TConfiguration> DefaultTConfiguration();
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index f1b94562b..55b8b04f4 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -105,7 +105,7 @@ DECLARE_bool(enable_sync_to_latest_event_on_ddls);
 DECLARE_bool(pull_table_types_and_comments);
 DECLARE_bool(enable_reload_events);
 DECLARE_string(geospatial_library);
-DECLARE_int32(thrift_rpc_max_message_size);
+DECLARE_int64(thrift_rpc_max_message_size);
 DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
 DECLARE_int32(iceberg_reload_new_files_threshold);
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 4bcb364d3..cdd1d84a2 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -85,13 +85,13 @@ export USE_AVRO_CPP=${USE_AVRO_CPP:=false}
 # moving to a different build of the toolchain, e.g. when a version is bumped 
or a
 # compile option is changed. The build id can be found in the output of the 
toolchain
 # build jobs, it is constructed from the build number and toolchain git hash 
prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID_AARCH64=25-051b912729
-export IMPALA_TOOLCHAIN_BUILD_ID_X86_64=413-051b912729
+export IMPALA_TOOLCHAIN_BUILD_ID_AARCH64=34-f93e2c9a86
+export IMPALA_TOOLCHAIN_BUILD_ID_X86_64=422-f93e2c9a86
 export IMPALA_TOOLCHAIN_REPO=\
 ${IMPALA_TOOLCHAIN_REPO:-https://github.com/cloudera/native-toolchain.git}
 export IMPALA_TOOLCHAIN_BRANCH=${IMPALA_TOOLCHAIN_BRANCH:-master}
 export IMPALA_TOOLCHAIN_COMMIT_HASH=\
-${IMPALA_TOOLCHAIN_COMMIT_HASH-051b912729cdf327572fec5006c2df054b857f50}
+${IMPALA_TOOLCHAIN_COMMIT_HASH-f93e2c9a865c80cafd76b872ad04400877766a2f}
 # Compare the build ref in build IDs by removing everything 
up-to-and-including the
 # first hyphen.
 if [ "${IMPALA_TOOLCHAIN_BUILD_ID_AARCH64#*-}" \
@@ -222,7 +222,7 @@ export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041
 # respect this version). If upgrading IMPALA_THRIFT_PY_VERSION, also upgrade 
the
 # thrift version in shell/ext-py, shell/packaging/requirements.txt, and
 # infra/python/deps/requirements.txt.
-export IMPALA_THRIFT_CPP_VERSION=0.16.0-p6
+export IMPALA_THRIFT_CPP_VERSION=0.16.0-p7
 unset IMPALA_THRIFT_CPP_URL
 if $USE_APACHE_HIVE; then
   # Apache Hive 3 clients can't run on thrift versions >= 0.14 (IMPALA-11801)
@@ -230,10 +230,10 @@ if $USE_APACHE_HIVE; then
   export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p5
 else
   export IMPALA_THRIFT_POM_VERSION=0.16.0
-  export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p6
+  export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p7
 fi
 unset IMPALA_THRIFT_JAVA_URL
-export IMPALA_THRIFT_PY_VERSION=0.16.0-p6
+export IMPALA_THRIFT_PY_VERSION=0.16.0-p7
 unset IMPALA_THRIFT_PY_URL
 
 # Find system python versions for testing
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index e51b6bad4..73ff305db 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -251,7 +251,7 @@ struct TBackendGflags {
 
   110: required bool skip_resource_checking_on_last_executor_group_set
 
-  111: required i32 thrift_rpc_max_message_size
+  111: required i64 thrift_rpc_max_message_size
 
   112: required string file_metadata_reload_properties
 
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index a050fe016..d2a1d7504 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -405,7 +405,10 @@ public class BackendConfig {
   }
 
   public int getThriftRpcMaxMessageSize() {
-    return backendCfg_.thrift_rpc_max_message_size;
+    // With IMPALA-13020, the C++ max message size is a 64-bit integer,
+    // but the Java max message size is still 32-bit. Cap the Java value
+    // at Integer.MAX_VALUE;
+    return (int) Math.min(backendCfg_.thrift_rpc_max_message_size, 
Integer.MAX_VALUE);
   }
 
   public String getFileMetadataReloadProperties() {

Reply via email to