This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb42862 Support timed connect for both bthread and pthread (#2524)
bcb42862 is described below

commit bcb42862a2d4a584e83afda1dc055d7d7caa6f7d
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Feb 26 10:46:03 2024 +0800

    Support timed connect for both bthread and pthread (#2524)
---
 src/bthread/fd.cpp             | 125 +++++++++++++++++------------------
 src/bthread/unstable.h         |   4 ++
 src/butil/endpoint.cpp         | 146 +++++++++++++++++++++++++++++++++++++++--
 src/butil/endpoint.h           |   5 ++
 src/butil/fd_utility.cpp       |   5 ++
 src/butil/fd_utility.h         |   3 +
 src/butil/memory/scope_guard.h |  88 +++++++++++++++++++++++++
 test/bthread_fd_unittest.cpp   |  39 +++++++++++
 test/endpoint_unittest.cpp     |  56 ++++++++++++++++
 9 files changed, 402 insertions(+), 69 deletions(-)

diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp
index f26cbd07..e97cee2c 100644
--- a/src/bthread/fd.cpp
+++ b/src/bthread/fd.cpp
@@ -31,10 +31,15 @@
 #include "butil/fd_utility.h"                     // make_non_blocking
 #include "butil/logging.h"
 #include "butil/third_party/murmurhash3/murmurhash3.h"   // fmix32
+#include "butil/memory/scope_guard.h"
 #include "bthread/butex.h"                       // butex_*
 #include "bthread/task_group.h"                  // TaskGroup
 #include "bthread/bthread.h"                             // 
bthread_start_urgent
 
+namespace butil {
+extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime);
+}
+
 // Implement bthread functions on file descriptors
 
 namespace bthread {
@@ -422,69 +427,10 @@ int stop_and_join_epoll_threads() {
     return rc;
 }
 
-#if defined(OS_LINUX)
-short epoll_to_poll_events(uint32_t epoll_events) {
-    // Most POLL* and EPOLL* are same values.
-    short poll_events = (epoll_events &
-                         (EPOLLIN | EPOLLPRI | EPOLLOUT |
-                          EPOLLRDNORM | EPOLLRDBAND |
-                          EPOLLWRNORM | EPOLLWRBAND |
-                          EPOLLMSG | EPOLLERR | EPOLLHUP));
-    CHECK_EQ((uint32_t)poll_events, epoll_events);
-    return poll_events;
-}
-#elif defined(OS_MACOSX)
-static short kqueue_to_poll_events(int kqueue_events) {
-    //TODO: add more values?
-    short poll_events = 0;
-    if (kqueue_events == EVFILT_READ) {
-        poll_events |= POLLIN;
-    }
-    if (kqueue_events == EVFILT_WRITE) {
-        poll_events |= POLLOUT;
-    }
-    return poll_events;
-}
-#endif
-
 // For pthreads.
 int pthread_fd_wait(int fd, unsigned events,
                     const timespec* abstime) {
-    int diff_ms = -1;
-    if (abstime) {
-        timespec now;
-        clock_gettime(CLOCK_REALTIME, &now);
-        int64_t now_us = butil::timespec_to_microseconds(now);
-        int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
-        if (abstime_us <= now_us) {
-            errno = ETIMEDOUT;
-            return -1;
-        }
-        diff_ms = (abstime_us - now_us + 999L) / 1000L;
-    }
-#if defined(OS_LINUX)
-    const short poll_events = bthread::epoll_to_poll_events(events);
-#elif defined(OS_MACOSX)
-    const short poll_events = bthread::kqueue_to_poll_events(events);
-#endif
-    if (poll_events == 0) {
-        errno = EINVAL;
-        return -1;
-    }
-    pollfd ufds = { fd, poll_events, 0 };
-    const int rc = poll(&ufds, 1, diff_ms);
-    if (rc < 0) {
-        return -1;
-    }
-    if (rc == 0) {
-        errno = ETIMEDOUT;
-        return -1;
-    }
-    if (ufds.revents & POLLNVAL) {
-        errno = EBADF;
-        return -1;
-    }
-    return 0;
+    return butil::pthread_fd_wait(fd, events, abstime);
 }
 
 }  // namespace bthread
@@ -527,9 +473,19 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
     if (NULL == g || g->is_current_pthread_task()) {
         return ::connect(sockfd, serv_addr, addrlen);
     }
-    // FIXME: Scoped non-blocking?
-    butil::make_non_blocking(sockfd);
-    const int rc = connect(sockfd, serv_addr, addrlen);
+
+    bool is_blocking = butil::is_blocking(sockfd);
+    if (is_blocking) {
+        butil::make_non_blocking(sockfd);
+    }
+    // Scoped non-blocking.
+    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+        if (is_blocking) {
+            butil::make_blocking(sockfd);
+        }
+    });
+
+    const int rc = ::connect(sockfd, serv_addr, addrlen);
     if (rc == 0 || errno != EINPROGRESS) {
         return rc;
     }
@@ -554,6 +510,49 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
     return 0;
 }
 
+int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+                          socklen_t addrlen, const timespec* abstime) {
+    if (!abstime) {
+        return bthread_connect(sockfd, serv_addr, addrlen);
+    }
+
+    bool is_blocking = butil::is_blocking(sockfd);
+    if (is_blocking) {
+        butil::make_non_blocking(sockfd);
+    }
+    // Scoped non-blocking.
+    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+        if (is_blocking) {
+            butil::make_blocking(sockfd);
+        }
+    });
+
+    const int rc = ::connect(sockfd, serv_addr, addrlen);
+    if (rc == 0 || errno != EINPROGRESS) {
+        return rc;
+    }
+#if defined(OS_LINUX)
+    if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) {
+#elif defined(OS_MACOSX)
+    if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) {
+#endif
+        return -1;
+    }
+
+    int err;
+    socklen_t errlen = sizeof(err);
+    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+        PLOG(FATAL) << "Fail to getsockopt";
+        return -1;
+    }
+    if (err != 0) {
+        CHECK(err != EINPROGRESS);
+        errno = err;
+        return -1;
+    }
+    return 0;
+}
+
 // This does not wake pthreads calling bthread_fd_*wait.
 int bthread_close(int fd) {
     return bthread::get_epoll_thread(fd).fd_close(fd);
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 5836f60d..5922cc2f 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -78,6 +78,10 @@ extern int bthread_close(int fd);
 // Replacement of connect(2) in bthreads.
 extern int bthread_connect(int sockfd, const struct sockaddr* serv_addr,
                            socklen_t addrlen);
+// Suspend caller thread until connect(2) on `sockfd' succeeds
+// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL.
+extern int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+                                 socklen_t addrlen, const timespec* abstime);
 
 // Add a startup function that each pthread worker will run at the beginning
 // To run code at the end, use butil::thread_atexit()
diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp
index a696aa42..ac1f958c 100644
--- a/src/butil/endpoint.cpp
+++ b/src/butil/endpoint.cpp
@@ -17,6 +17,7 @@
 
 // Date: Mon. Nov 7 14:47:36 CST 2011
 
+#include "butil/compat.h"
 #include <arpa/inet.h>                         // inet_pton, inet_ntop
 #include <netdb.h>                             // gethostbyname_r
 #include <unistd.h>                            // gethostname
@@ -28,12 +29,18 @@
 #include <sys/socket.h>                        // SO_REUSEADDR SO_REUSEPORT
 #include <memory>
 #include <gflags/gflags.h>
+#include <sys/poll.h>
+#if defined(OS_MACOSX)
+#include <sys/event.h>                           // kevent(), kqueue()
+#endif
 #include "butil/build_config.h"                // OS_MACOSX
 #include "butil/fd_guard.h"                    // fd_guard
 #include "butil/endpoint.h"                    // ip_t
 #include "butil/logging.h"
 #include "butil/memory/singleton_on_pthread_once.h"
 #include "butil/strings/string_piece.h"
+#include "butil/fd_utility.h"
+#include "butil/memory/scope_guard.h"
 
 //supported since Linux 3.9.
 DEFINE_bool(reuse_port, false, "Enable SO_REUSEPORT for all listened sockets");
@@ -47,6 +54,11 @@ int BAIDU_WEAK bthread_connect(
     int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen) {
     return connect(sockfd, serv_addr, addrlen);
 }
+
+int BAIDU_WEAK bthread_timed_connect(
+    int sockfd, const struct sockaddr* serv_addr,
+    socklen_t addrlen, const timespec* abstime);
+
 __END_DECLS
 
 #include "details/extended_endpoint.hpp"
@@ -380,10 +392,121 @@ int endpoint2hostname(const EndPoint& point, 
std::string* host) {
     return -1;
 }
 
-int tcp_connect(EndPoint point, int* self_port) {
-    struct sockaddr_storage serv_addr;
+#if defined(OS_LINUX)
+static short epoll_to_poll_events(uint32_t epoll_events) {
+    // Most POLL* and EPOLL* are same values.
+    short poll_events = (epoll_events &
+                         (EPOLLIN | EPOLLPRI | EPOLLOUT |
+                          EPOLLRDNORM | EPOLLRDBAND |
+                          EPOLLWRNORM | EPOLLWRBAND |
+                          EPOLLMSG | EPOLLERR | EPOLLHUP));
+    CHECK_EQ((uint32_t)poll_events, epoll_events);
+    return poll_events;
+}
+#elif defined(OS_MACOSX)
+short kqueue_to_poll_events(int kqueue_events) {
+    //TODO: add more values?
+    short poll_events = 0;
+    if (kqueue_events == EVFILT_READ) {
+        poll_events |= POLLIN;
+    }
+    if (kqueue_events == EVFILT_WRITE) {
+        poll_events |= POLLOUT;
+    }
+    return poll_events;
+}
+#endif
+
+int pthread_fd_wait(int fd, unsigned events,
+                    const timespec* abstime) {
+    int diff_ms = -1;
+    if (abstime) {
+        timespec now;
+        clock_gettime(CLOCK_REALTIME, &now);
+        int64_t now_us = butil::timespec_to_microseconds(now);
+        int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
+        if (abstime_us <= now_us) {
+            errno = ETIMEDOUT;
+            return -1;
+        }
+        diff_ms = (abstime_us - now_us + 999L) / 1000L;
+    }
+#if defined(OS_LINUX)
+    const short poll_events = epoll_to_poll_events(events);
+#elif defined(OS_MACOSX)
+    const short poll_events = kqueue_to_poll_events(events);
+#endif
+    if (poll_events == 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    pollfd ufds = { fd, poll_events, 0 };
+    const int rc = poll(&ufds, 1, diff_ms);
+    if (rc < 0) {
+        return -1;
+    }
+    if (rc == 0) {
+        errno = ETIMEDOUT;
+        return -1;
+    }
+    if (ufds.revents & POLLNVAL) {
+        errno = EBADF;
+        return -1;
+    }
+    return 0;
+}
+
+int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+                          socklen_t addrlen, const timespec* abstime) {
+    if (abstime == NULL) {
+        return ::connect(sockfd, serv_addr, addrlen);
+    }
+
+    bool is_blocking = butil::is_blocking(sockfd);
+    if (is_blocking) {
+        butil::make_non_blocking(sockfd);
+    }
+    // Scoped non-blocking.
+    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+        if (is_blocking) {
+            butil::make_blocking(sockfd);
+        }
+    });
+
+    const int rc = ::connect(sockfd, serv_addr, addrlen);
+    if (rc == 0 || errno != EINPROGRESS) {
+        return rc;
+    }
+#if defined(OS_LINUX)
+    if (pthread_fd_wait(sockfd, EPOLLOUT, abstime) < 0) {
+#elif defined(OS_MACOSX)
+    if (pthread_fd_wait(sockfd, EVFILT_WRITE, abstime) < 0) {
+#endif
+        return -1;
+    }
+
+    int err;
+    socklen_t errlen = sizeof(err);
+    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+        PLOG(FATAL) << "Fail to getsockopt";
+        return -1;
+    }
+    if (err != 0) {
+        CHECK(err != EINPROGRESS);
+        errno = err;
+        return -1;
+    }
+    return 0;
+}
+
+int tcp_connect(EndPoint server, int* self_port) {
+    return tcp_connect(server, self_port, -1);
+}
+
+int tcp_connect(const EndPoint& server, int* self_port, int 
connect_timeout_ms) {
+    struct sockaddr_storage serv_addr{};
     socklen_t serv_addr_size = 0;
-    if (endpoint2sockaddr(point, &serv_addr, &serv_addr_size) != 0) {
+    if (endpoint2sockaddr(server, &serv_addr, &serv_addr_size) != 0) {
         return -1;
     }
     fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
@@ -391,10 +514,21 @@ int tcp_connect(EndPoint point, int* self_port) {
         return -1;
     }
     int rc = 0;
-    if (bthread_connect != NULL) {
-        rc = bthread_connect(sockfd, (struct sockaddr*) &serv_addr, 
serv_addr_size);
+    if (connect_timeout_ms <= 0) {
+        if (bthread_connect != NULL) {
+            rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr, 
serv_addr_size);
+        } else {
+            rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, 
serv_addr_size);
+        }
     } else {
-        rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size);
+        timespec abstime = butil::milliseconds_from_now(connect_timeout_ms);
+        if (bthread_timed_connect != NULL) {
+            rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
+                                       serv_addr_size, &abstime);
+        } else {
+            rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
+                                       serv_addr_size, &abstime);
+        }
     }
     if (rc < 0) {
         return -1;
diff --git a/src/butil/endpoint.h b/src/butil/endpoint.h
index 3b9cdf44..c6d00bbb 100644
--- a/src/butil/endpoint.h
+++ b/src/butil/endpoint.h
@@ -130,6 +130,11 @@ int endpoint2hostname(const EndPoint& point, std::string* 
host);
 // into `self_port' if it's not NULL.
 // Returns the socket descriptor, -1 otherwise and errno is set.
 int tcp_connect(EndPoint server, int* self_port);
+// Suspend caller thread until connect(2) on `sockfd' succeeds
+// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL.
+// Write port of this side into `self_port' if it's not NULL.
+// Returns the socket descriptor, -1 otherwise and errno is set.
+int tcp_connect(const EndPoint& server, int* self_port, int 
connect_timeout_ms);
 
 // Create and listen to a TCP socket bound with `ip_and_port'.
 // To enable SO_REUSEADDR for the whole program, enable gflag -reuse_addr
diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp
index e1ca8cc7..45577769 100644
--- a/src/butil/fd_utility.cpp
+++ b/src/butil/fd_utility.cpp
@@ -25,6 +25,11 @@
 
 namespace butil {
 
+bool is_blocking(int fd) {
+    const int flags = fcntl(fd, F_GETFL, 0);
+    return flags >= 0 && !(flags & O_NONBLOCK);
+}
+
 int make_non_blocking(int fd) {
     const int flags = fcntl(fd, F_GETFL, 0);
     if (flags < 0) {
diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h
index 8b4f789e..8d93363d 100644
--- a/src/butil/fd_utility.h
+++ b/src/butil/fd_utility.h
@@ -24,6 +24,9 @@
 
 namespace butil {
 
+// Returns true when fd is blocking, false otherwise.
+bool is_blocking(int fd);
+
 // Make file descriptor |fd| non-blocking
 // Returns 0 on success, -1 otherwise and errno is set (by fcntl)
 int make_non_blocking(int fd);
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
new file mode 100644
index 00000000..1771683f
--- /dev/null
+++ b/src/butil/memory/scope_guard.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_SCOPED_GUARD_H
+#define BRPC_SCOPED_GUARD_H
+
+#include <type_traits>
+
+namespace butil {
+
+// Whether a no-argument callable returns void.
+template<typename T>
+struct returns_void_t
+    : public std::is_same<void, decltype(std::declval<T&&>()())>
+{};
+
+template<typename Callback,
+    typename = typename std::enable_if<
+        returns_void_t<Callback>::value>::type>
+class ScopeGuard;
+
+template<typename Callback>
+ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept;
+
+// ScopeGuard is a simple implementation to guarantee that
+// a function is executed upon leaving the current scope.
+template<typename Callback>
+class ScopeGuard<Callback> {
+public:
+    ScopeGuard(ScopeGuard&& other) noexcept
+        :_callback(std::move(other._callback))
+        , _dismiss(other._dismiss) {
+        other.dismiss();
+    }
+
+    ~ScopeGuard() noexcept {
+        if(!_dismiss) {
+            _callback();
+        }
+    }
+
+    void dismiss() noexcept {
+        _dismiss = true;
+    }
+
+    ScopeGuard() = delete;
+    ScopeGuard(const ScopeGuard&) = delete;
+    ScopeGuard& operator=(const ScopeGuard&) = delete;
+    ScopeGuard& operator=(ScopeGuard&&) = delete;
+
+private:
+// Only MakeScopeGuard and move constructor can create ScopeGuard.
+friend ScopeGuard<Callback> MakeScopeGuard<Callback>(Callback&& callback) 
noexcept;
+
+    explicit ScopeGuard(Callback&& callback) noexcept
+        :_callback(std::forward<Callback>(callback))
+        , _dismiss(false) {}
+
+private:
+    Callback _callback;
+    bool _dismiss;
+};
+
+// The MakeScopeGuard() function is used to create a new ScopeGuard object.
+// It can be instantiated with a lambda function, a std::function<void()>,
+// a functor, or a void(*)() function pointer.
+template<typename Callback>
+ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept {
+    return ScopeGuard<Callback>{ std::forward<Callback>(callback)};
+}
+
+}
+
+#endif // BRPC_SCOPED_GUARD_H
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index 5e3acb61..ec94f79f 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -26,6 +26,8 @@
 #include "butil/time.h"
 #include "butil/macros.h"
 #include "butil/fd_utility.h"
+#include <butil/endpoint.h>
+#include <butil/fd_guard.h>
 #include "butil/logging.h"
 #include "bthread/task_control.h"
 #include "bthread/task_group.h"
@@ -555,4 +557,41 @@ TEST(FDTest, double_close) {
     ASSERT_EQ(-1, bthread_close(fds[1]));
     ASSERT_EQ(ec, errno);
 }
+
+const char* g_hostname = "baidu.com";
+TEST(FDTest, bthread_connect) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+
+    {
+        struct sockaddr_storage serv_addr{};
+        socklen_t serv_addr_size = 0;
+        ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+        butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+        ASSERT_LE(0, sockfd);
+        bool is_blocking = butil::is_blocking(sockfd);
+        ASSERT_LE(0, sockfd);
+        ASSERT_EQ(0, bthread_connect(sockfd, (struct sockaddr*) &serv_addr, 
serv_addr_size));
+        ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+
+    }
+
+    {
+        struct sockaddr_storage serv_addr{};
+        socklen_t serv_addr_size = 0;
+        ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+        butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+        ASSERT_LE(0, sockfd);
+        bool is_blocking = butil::is_blocking(sockfd);
+        // In most cases, 1 millisecond will result in a connection timeout.
+        timespec abstime = butil::milliseconds_from_now(1);
+        const int rc = bthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, &abstime);
+        ASSERT_EQ(-1, rc);
+        ASSERT_EQ(ETIMEDOUT, errno);
+        ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+    }
+}
+
 } // namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 4cb7f757..fcb23a7b 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -16,12 +16,19 @@
 // under the License.
 
 #include <gtest/gtest.h>
+#include <butil/fd_guard.h>
+#include <butil/fd_utility.h>
 #include "butil/errno.h"
 #include "butil/endpoint.h"
 #include "butil/logging.h"
 #include "butil/containers/flat_map.h"
 #include "butil/details/extended_endpoint.hpp"
 
+namespace butil {
+int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+                          socklen_t addrlen, const timespec* abstime);
+}
+
 namespace {
 
 using butil::details::ExtendedEndPoint;
@@ -472,4 +479,53 @@ TEST(EndPointTest, endpoint_concurrency) {
     }
 }
 
+const char* g_hostname = "baidu.com";
+
+TEST(EndPointTest, tcp_connect) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+    {
+        butil::fd_guard sockfd(butil::tcp_connect(ep, NULL));
+        ASSERT_LE(0, sockfd);
+    }
+    {
+        butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1000));
+        ASSERT_LE(0, sockfd);
+    }
+    {
+        butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1));
+        ASSERT_EQ(-1, sockfd);
+        ASSERT_EQ(ETIMEDOUT, errno);
+    }
+
+    {
+        struct sockaddr_storage serv_addr{};
+        socklen_t serv_addr_size = 0;
+        ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+        butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+        ASSERT_LE(0, sockfd);
+        bool is_blocking = butil::is_blocking(sockfd);
+        ASSERT_EQ(0, butil::pthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, NULL));
+        ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+    }
+
+    {
+        struct sockaddr_storage serv_addr{};
+        socklen_t serv_addr_size = 0;
+        ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+        butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+        ASSERT_LE(0, sockfd);
+        bool is_blocking = butil::is_blocking(sockfd);
+        // In most cases, 1 millisecond will result in a connection timeout.
+        timespec abstime = butil::milliseconds_from_now(1);
+        const int rc = butil::pthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, &abstime);
+        ASSERT_EQ(-1, rc);
+        ASSERT_EQ(ETIMEDOUT, errno);
+        ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+    }
+}
+
 } // end of namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to