This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4c21084e20b0a10ba5c552f22a1a34c74e20ede2
Author: wzhou-code <[email protected]>
AuthorDate: Fri Oct 4 15:30:57 2024 -0700

    IMPALA-13427: Make connect timeout of statestore HA RPC tunable
    
    In some deployment environment, unsuccessful socket connection attempts
    take long time to return error if connect timeout of the socket is set
    as 0. This causes statestored to tie up in unsuccessful connection
    attempts during initialization if its peer is not ready when statestore
    HA is enabled.
    
    Currently socket connect timeout of Thrift RPC is always set as 0.
    This patch makes socket connect timeout of Thrift RPC tunable, and
    adds a flag variable statestore_ha_client_rpc_conn_timeout_ms for the
    underlying socket connect timeout for Statestore HA RPC. Its default
    value equals 0, which equals the default value of TSocket.connTimeout_.
    
    Testing:
     - Added new test cases for statestore HA with non zero value for
       statestore_ha_client_rpc_conn_timeout_ms.
     - Passed core tests.
    
    Change-Id: Ie5840a76b4a34b4714c47b86f6366328b5ceed3a
    Reviewed-on: http://gerrit.cloudera.org:8080/21900
    Reviewed-by: Norbert Luksa <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/thrift-client.h                  |  3 ++
 be/src/runtime/client-cache.cc              |  3 +-
 be/src/runtime/client-cache.h               | 17 +++++---
 be/src/statestore/statestore.cc             |  4 +-
 tests/custom_cluster/test_statestored_ha.py | 61 +++++++++++++++++++++++------
 5 files changed, 68 insertions(+), 20 deletions(-)

diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h
index 9e3c8eac3..a88fe300a 100644
--- a/be/src/rpc/thrift-client.h
+++ b/be/src/rpc/thrift-client.h
@@ -62,6 +62,9 @@ class ThriftClientImpl {
   /// Set send timeout on the underlying TSocket.
   void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); }
 
+  /// Set connect timeout on the underlying TSocket.
+  void setConnTimeout(int32_t ms) { socket_->setConnTimeout(ms); }
+
   Status init_status() { return init_status_; }
 
  protected:
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index c1936a27d..af902da56 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -119,9 +119,10 @@ Status ClientCacheHelper::CreateClient(const 
TNetworkAddress& address,
     return client_impl->init_status();
   }
 
-  // Set the TSocket's send and receive timeouts.
+  // Set the TSocket's send, receive and connect timeouts.
   client_impl->setRecvTimeout(recv_timeout_ms_);
   client_impl->setSendTimeout(send_timeout_ms_);
+  client_impl->setConnTimeout(conn_timeout_ms_);
 
   Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
   if (!status.ok()) {
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index cb80e0b2a..78ada4cfa 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -130,11 +130,12 @@ class ClientCacheHelper {
   template <class T> friend class ClientCache;
   /// Private constructor so that only ClientCache can instantiate this class.
   ClientCacheHelper(uint32_t num_tries, uint64_t wait_ms, int32_t 
send_timeout_ms,
-      int32_t recv_timeout_ms)
+      int32_t recv_timeout_ms, int32_t conn_timeout_ms)
       : num_tries_(num_tries),
         wait_ms_(wait_ms),
         send_timeout_ms_(send_timeout_ms),
         recv_timeout_ms_(recv_timeout_ms),
+        conn_timeout_ms_(conn_timeout_ms),
         metrics_enabled_(false) { }
 
   /// There are three lock categories - the cache-wide lock (cache_lock_), the 
locks for a
@@ -193,6 +194,10 @@ class ClientCacheHelper {
   /// Time to wait for the underlying socket to receive data, e.g., for an RPC 
response.
   const int32_t recv_timeout_ms_;
 
+  /// Time to wait for setting up underlying TSocket connection. The default 
value
+  /// equals 0, which is same as the default value of TSocket.connTimeout_.
+  const int32_t conn_timeout_ms_;
+
   /// True if metrics have been registered (i.e. InitMetrics() was called)), 
and *_metric_
   /// are valid pointers.
   bool metrics_enabled_;
@@ -410,7 +415,7 @@ class ClientCache {
   typedef ThriftClient<T> Client;
 
   ClientCache(const std::string& service_name = "", bool enable_ssl = false)
-      : client_cache_helper_(1, 0, 0, 0) {
+      : client_cache_helper_(1, 0, 0, 0, 0) {
     client_factory_ = boost::bind<ThriftClientImpl*>(
         boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, 
enable_ssl);
   }
@@ -418,11 +423,13 @@ class ClientCache {
   /// Create a ClientCache where connections are tried num_tries times, with a 
pause of
   /// wait_ms between attempts. The underlying TSocket's send and receive 
timeouts of
   /// each connection can also be set. If num_tries == 0, retry connections 
indefinitely.
-  /// A send/receive timeout of 0 means there is no timeout.
+  /// A send/receive/connect timeout of 0 means there is no timeout.
   ClientCache(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms = 
0,
       int32_t recv_timeout_ms = 0, const std::string& service_name = "",
-      bool enable_ssl = false)
-      : client_cache_helper_(num_tries, wait_ms, send_timeout_ms, 
recv_timeout_ms) {
+      bool enable_ssl = false, int32_t conn_timeout_ms = 0)
+      : client_cache_helper_(
+          num_tries, wait_ms, send_timeout_ms, recv_timeout_ms, 
conn_timeout_ms) {
+    DCHECK_GE(conn_timeout_ms, 0);
     client_factory_ = boost::bind<ThriftClientImpl*>(
         boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, 
enable_ssl);
   }
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 5b5ec184f..98ab753cc 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -155,6 +155,8 @@ DEFINE_int32(statestore_peer_cnxn_retry_interval_ms, 1000, 
"The interval, in ms,
 DEFINE_int32(statestore_ha_client_rpc_timeout_ms, 300000, "(Advanced) The 
underlying "
     "TSocket send/recv timeout in milliseconds for a client RPC of Statestore 
HA "
     "service.");
+DEFINE_int32(statestore_ha_client_rpc_conn_timeout_ms, 0, "(Advanced) The 
underlying "
+    "TSocket conn timeout in milliseconds for a client RPC of Statestore HA 
service.");
 DEFINE_int64(update_statestore_rpc_resend_interval_ms, 100, "(Advanced) 
Interval "
     "(in ms) with which the statestore resends the RPCs of updating 
statestored's role "
     "to subscribers if the statestore has failed to send the RPCs to the 
subscribers.");
@@ -753,7 +755,7 @@ Statestore::Statestore(MetricGroup* metrics)
     ha_client_cache_.reset(new StatestoreHaClientCache(1, 0,
         FLAGS_statestore_ha_client_rpc_timeout_ms,
         FLAGS_statestore_ha_client_rpc_timeout_ms, "",
-        IsInternalTlsConfigured()));
+        IsInternalTlsConfigured(), 
FLAGS_statestore_ha_client_rpc_conn_timeout_ms));
     ha_client_cache_->InitMetrics(metrics, "statestored-ha");
     ha_standby_ss_failure_detector_.reset(new MissedHeartbeatFailureDetector(
         FLAGS_statestore_max_missed_heartbeats,
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 35dcca7f7..6cea78644 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -171,12 +171,9 @@ class TestStatestoredHA(CustomClusterTestSuite):
           pytest.skip("Skip this tests for UBSAN builds since " + 
assert_string)
         assert False, assert_string
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    statestored_args="--use_network_address_as_statestore_priority=true",
-    start_args="--enable_statestored_ha")
-  def test_statestored_ha_with_two_statestored(self):
-    """The test case for cluster started with statestored HA enabled."""
+  def __test_statestored_ha_with_two_statestored(self):
+    """Basic test case for cluster started with two statestored when 
statestored HA is
+    enabled."""
     # Verify two statestored instances are created with one as active.
     statestoreds = self.cluster.statestoreds()
     assert(len(statestoreds) == 2)
@@ -210,13 +207,26 @@ class TestStatestoredHA(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    statestored_args="--enable_statestored_ha=true "
-                     "--use_network_address_as_statestore_priority=true "
-                     "--statestore_ha_preemption_wait_period_ms=200",
-    catalogd_args="--enable_statestored_ha=true",
-    impalad_args="--enable_statestored_ha=true")
-  def test_statestored_ha_with_one_statestored(self):
-    """The test case for cluster with only one statestored when statestored HA 
is
+    statestored_args="--use_network_address_as_statestore_priority=true",
+    start_args="--enable_statestored_ha")
+  def test_statestored_ha_with_two_statestored(self):
+    """The test case for cluster with two statestored when statestored HA is 
enabled.
+    """
+    self.__test_statestored_ha_with_two_statestored()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_network_address_as_statestore_priority=true "
+                     "--statestore_ha_preemption_wait_period_ms=200 "
+                     "--statestore_ha_client_rpc_conn_timeout_ms=100",
+    start_args="--enable_statestored_ha")
+  def test_statestored_ha_with_two_statestored_and_conn_timeout(self):
+    """The test case for cluster with two statestored when statestored HA is 
enabled.
+    statestore_ha_client_rpc_conn_timeout_ms is set as 100 ms."""
+    self.__test_statestored_ha_with_two_statestored()
+
+  def __test_statestored_ha_with_one_statestored(self):
+    """Basic test case for cluster with only one statestored when statestored 
HA is
     enabled."""
     # Verify the statestored instances is created as active.
     statestoreds = self.cluster.statestoreds()
@@ -233,6 +243,31 @@ class TestStatestoredHA(CustomClusterTestSuite):
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--enable_statestored_ha=true "
+                     "--use_network_address_as_statestore_priority=true "
+                     "--statestore_ha_preemption_wait_period_ms=200",
+    catalogd_args="--enable_statestored_ha=true",
+    impalad_args="--enable_statestored_ha=true")
+  def test_statestored_ha_with_one_statestored(self):
+    """The test case for cluster with only one statestored when statestored HA 
is
+    enabled."""
+    self.__test_statestored_ha_with_one_statestored()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--enable_statestored_ha=true "
+                     "--use_network_address_as_statestore_priority=true "
+                     "--statestore_ha_preemption_wait_period_ms=200 "
+                     "--statestore_ha_client_rpc_conn_timeout_ms=100",
+    catalogd_args="--enable_statestored_ha=true",
+    impalad_args="--enable_statestored_ha=true")
+  def test_statestored_ha_with_one_statestored_and_conn_timeout(self):
+    """The test case for cluster with only one statestored when statestored HA 
is
+    enabled. statestore_ha_client_rpc_conn_timeout_ms is set as 100 ms."""
+    self.__test_statestored_ha_with_one_statestored()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true",

Reply via email to