This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new bcb42862 Support timed connect for both bthread and pthread (#2524) bcb42862 is described below commit bcb42862a2d4a584e83afda1dc055d7d7caa6f7d Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Mon Feb 26 10:46:03 2024 +0800 Support timed connect for both bthread and pthread (#2524) --- src/bthread/fd.cpp | 125 +++++++++++++++++------------------ src/bthread/unstable.h | 4 ++ src/butil/endpoint.cpp | 146 +++++++++++++++++++++++++++++++++++++++-- src/butil/endpoint.h | 5 ++ src/butil/fd_utility.cpp | 5 ++ src/butil/fd_utility.h | 3 + src/butil/memory/scope_guard.h | 88 +++++++++++++++++++++++++ test/bthread_fd_unittest.cpp | 39 +++++++++++ test/endpoint_unittest.cpp | 56 ++++++++++++++++ 9 files changed, 402 insertions(+), 69 deletions(-) diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp index f26cbd07..e97cee2c 100644 --- a/src/bthread/fd.cpp +++ b/src/bthread/fd.cpp @@ -31,10 +31,15 @@ #include "butil/fd_utility.h" // make_non_blocking #include "butil/logging.h" #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32 +#include "butil/memory/scope_guard.h" #include "bthread/butex.h" // butex_* #include "bthread/task_group.h" // TaskGroup #include "bthread/bthread.h" // bthread_start_urgent +namespace butil { +extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime); +} + // Implement bthread functions on file descriptors namespace bthread { @@ -422,69 +427,10 @@ int stop_and_join_epoll_threads() { return rc; } -#if defined(OS_LINUX) -short epoll_to_poll_events(uint32_t epoll_events) { - // Most POLL* and EPOLL* are same values. - short poll_events = (epoll_events & - (EPOLLIN | EPOLLPRI | EPOLLOUT | - EPOLLRDNORM | EPOLLRDBAND | - EPOLLWRNORM | EPOLLWRBAND | - EPOLLMSG | EPOLLERR | EPOLLHUP)); - CHECK_EQ((uint32_t)poll_events, epoll_events); - return poll_events; -} -#elif defined(OS_MACOSX) -static short kqueue_to_poll_events(int kqueue_events) { - //TODO: add more values? - short poll_events = 0; - if (kqueue_events == EVFILT_READ) { - poll_events |= POLLIN; - } - if (kqueue_events == EVFILT_WRITE) { - poll_events |= POLLOUT; - } - return poll_events; -} -#endif - // For pthreads. int pthread_fd_wait(int fd, unsigned events, const timespec* abstime) { - int diff_ms = -1; - if (abstime) { - timespec now; - clock_gettime(CLOCK_REALTIME, &now); - int64_t now_us = butil::timespec_to_microseconds(now); - int64_t abstime_us = butil::timespec_to_microseconds(*abstime); - if (abstime_us <= now_us) { - errno = ETIMEDOUT; - return -1; - } - diff_ms = (abstime_us - now_us + 999L) / 1000L; - } -#if defined(OS_LINUX) - const short poll_events = bthread::epoll_to_poll_events(events); -#elif defined(OS_MACOSX) - const short poll_events = bthread::kqueue_to_poll_events(events); -#endif - if (poll_events == 0) { - errno = EINVAL; - return -1; - } - pollfd ufds = { fd, poll_events, 0 }; - const int rc = poll(&ufds, 1, diff_ms); - if (rc < 0) { - return -1; - } - if (rc == 0) { - errno = ETIMEDOUT; - return -1; - } - if (ufds.revents & POLLNVAL) { - errno = EBADF; - return -1; - } - return 0; + return butil::pthread_fd_wait(fd, events, abstime); } } // namespace bthread @@ -527,9 +473,19 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr, if (NULL == g || g->is_current_pthread_task()) { return ::connect(sockfd, serv_addr, addrlen); } - // FIXME: Scoped non-blocking? - butil::make_non_blocking(sockfd); - const int rc = connect(sockfd, serv_addr, addrlen); + + bool is_blocking = butil::is_blocking(sockfd); + if (is_blocking) { + butil::make_non_blocking(sockfd); + } + // Scoped non-blocking. + auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() { + if (is_blocking) { + butil::make_blocking(sockfd); + } + }); + + const int rc = ::connect(sockfd, serv_addr, addrlen); if (rc == 0 || errno != EINPROGRESS) { return rc; } @@ -554,6 +510,49 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr, return 0; } +int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, + socklen_t addrlen, const timespec* abstime) { + if (!abstime) { + return bthread_connect(sockfd, serv_addr, addrlen); + } + + bool is_blocking = butil::is_blocking(sockfd); + if (is_blocking) { + butil::make_non_blocking(sockfd); + } + // Scoped non-blocking. + auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() { + if (is_blocking) { + butil::make_blocking(sockfd); + } + }); + + const int rc = ::connect(sockfd, serv_addr, addrlen); + if (rc == 0 || errno != EINPROGRESS) { + return rc; + } +#if defined(OS_LINUX) + if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) { +#elif defined(OS_MACOSX) + if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) { +#endif + return -1; + } + + int err; + socklen_t errlen = sizeof(err); + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { + PLOG(FATAL) << "Fail to getsockopt"; + return -1; + } + if (err != 0) { + CHECK(err != EINPROGRESS); + errno = err; + return -1; + } + return 0; +} + // This does not wake pthreads calling bthread_fd_*wait. int bthread_close(int fd) { return bthread::get_epoll_thread(fd).fd_close(fd); diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index 5836f60d..5922cc2f 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -78,6 +78,10 @@ extern int bthread_close(int fd); // Replacement of connect(2) in bthreads. extern int bthread_connect(int sockfd, const struct sockaddr* serv_addr, socklen_t addrlen); +// Suspend caller thread until connect(2) on `sockfd' succeeds +// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL. +extern int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, + socklen_t addrlen, const timespec* abstime); // Add a startup function that each pthread worker will run at the beginning // To run code at the end, use butil::thread_atexit() diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp index a696aa42..ac1f958c 100644 --- a/src/butil/endpoint.cpp +++ b/src/butil/endpoint.cpp @@ -17,6 +17,7 @@ // Date: Mon. Nov 7 14:47:36 CST 2011 +#include "butil/compat.h" #include <arpa/inet.h> // inet_pton, inet_ntop #include <netdb.h> // gethostbyname_r #include <unistd.h> // gethostname @@ -28,12 +29,18 @@ #include <sys/socket.h> // SO_REUSEADDR SO_REUSEPORT #include <memory> #include <gflags/gflags.h> +#include <sys/poll.h> +#if defined(OS_MACOSX) +#include <sys/event.h> // kevent(), kqueue() +#endif #include "butil/build_config.h" // OS_MACOSX #include "butil/fd_guard.h" // fd_guard #include "butil/endpoint.h" // ip_t #include "butil/logging.h" #include "butil/memory/singleton_on_pthread_once.h" #include "butil/strings/string_piece.h" +#include "butil/fd_utility.h" +#include "butil/memory/scope_guard.h" //supported since Linux 3.9. DEFINE_bool(reuse_port, false, "Enable SO_REUSEPORT for all listened sockets"); @@ -47,6 +54,11 @@ int BAIDU_WEAK bthread_connect( int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen) { return connect(sockfd, serv_addr, addrlen); } + +int BAIDU_WEAK bthread_timed_connect( + int sockfd, const struct sockaddr* serv_addr, + socklen_t addrlen, const timespec* abstime); + __END_DECLS #include "details/extended_endpoint.hpp" @@ -380,10 +392,121 @@ int endpoint2hostname(const EndPoint& point, std::string* host) { return -1; } -int tcp_connect(EndPoint point, int* self_port) { - struct sockaddr_storage serv_addr; +#if defined(OS_LINUX) +static short epoll_to_poll_events(uint32_t epoll_events) { + // Most POLL* and EPOLL* are same values. + short poll_events = (epoll_events & + (EPOLLIN | EPOLLPRI | EPOLLOUT | + EPOLLRDNORM | EPOLLRDBAND | + EPOLLWRNORM | EPOLLWRBAND | + EPOLLMSG | EPOLLERR | EPOLLHUP)); + CHECK_EQ((uint32_t)poll_events, epoll_events); + return poll_events; +} +#elif defined(OS_MACOSX) +short kqueue_to_poll_events(int kqueue_events) { + //TODO: add more values? + short poll_events = 0; + if (kqueue_events == EVFILT_READ) { + poll_events |= POLLIN; + } + if (kqueue_events == EVFILT_WRITE) { + poll_events |= POLLOUT; + } + return poll_events; +} +#endif + +int pthread_fd_wait(int fd, unsigned events, + const timespec* abstime) { + int diff_ms = -1; + if (abstime) { + timespec now; + clock_gettime(CLOCK_REALTIME, &now); + int64_t now_us = butil::timespec_to_microseconds(now); + int64_t abstime_us = butil::timespec_to_microseconds(*abstime); + if (abstime_us <= now_us) { + errno = ETIMEDOUT; + return -1; + } + diff_ms = (abstime_us - now_us + 999L) / 1000L; + } +#if defined(OS_LINUX) + const short poll_events = epoll_to_poll_events(events); +#elif defined(OS_MACOSX) + const short poll_events = kqueue_to_poll_events(events); +#endif + if (poll_events == 0) { + errno = EINVAL; + return -1; + } + pollfd ufds = { fd, poll_events, 0 }; + const int rc = poll(&ufds, 1, diff_ms); + if (rc < 0) { + return -1; + } + if (rc == 0) { + errno = ETIMEDOUT; + return -1; + } + if (ufds.revents & POLLNVAL) { + errno = EBADF; + return -1; + } + return 0; +} + +int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, + socklen_t addrlen, const timespec* abstime) { + if (abstime == NULL) { + return ::connect(sockfd, serv_addr, addrlen); + } + + bool is_blocking = butil::is_blocking(sockfd); + if (is_blocking) { + butil::make_non_blocking(sockfd); + } + // Scoped non-blocking. + auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() { + if (is_blocking) { + butil::make_blocking(sockfd); + } + }); + + const int rc = ::connect(sockfd, serv_addr, addrlen); + if (rc == 0 || errno != EINPROGRESS) { + return rc; + } +#if defined(OS_LINUX) + if (pthread_fd_wait(sockfd, EPOLLOUT, abstime) < 0) { +#elif defined(OS_MACOSX) + if (pthread_fd_wait(sockfd, EVFILT_WRITE, abstime) < 0) { +#endif + return -1; + } + + int err; + socklen_t errlen = sizeof(err); + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) { + PLOG(FATAL) << "Fail to getsockopt"; + return -1; + } + if (err != 0) { + CHECK(err != EINPROGRESS); + errno = err; + return -1; + } + return 0; +} + +int tcp_connect(EndPoint server, int* self_port) { + return tcp_connect(server, self_port, -1); +} + +int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms) { + struct sockaddr_storage serv_addr{}; socklen_t serv_addr_size = 0; - if (endpoint2sockaddr(point, &serv_addr, &serv_addr_size) != 0) { + if (endpoint2sockaddr(server, &serv_addr, &serv_addr_size) != 0) { return -1; } fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); @@ -391,10 +514,21 @@ int tcp_connect(EndPoint point, int* self_port) { return -1; } int rc = 0; - if (bthread_connect != NULL) { - rc = bthread_connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size); + if (connect_timeout_ms <= 0) { + if (bthread_connect != NULL) { + rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr, serv_addr_size); + } else { + rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size); + } } else { - rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size); + timespec abstime = butil::milliseconds_from_now(connect_timeout_ms); + if (bthread_timed_connect != NULL) { + rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr, + serv_addr_size, &abstime); + } else { + rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, &abstime); + } } if (rc < 0) { return -1; diff --git a/src/butil/endpoint.h b/src/butil/endpoint.h index 3b9cdf44..c6d00bbb 100644 --- a/src/butil/endpoint.h +++ b/src/butil/endpoint.h @@ -130,6 +130,11 @@ int endpoint2hostname(const EndPoint& point, std::string* host); // into `self_port' if it's not NULL. // Returns the socket descriptor, -1 otherwise and errno is set. int tcp_connect(EndPoint server, int* self_port); +// Suspend caller thread until connect(2) on `sockfd' succeeds +// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL. +// Write port of this side into `self_port' if it's not NULL. +// Returns the socket descriptor, -1 otherwise and errno is set. +int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms); // Create and listen to a TCP socket bound with `ip_and_port'. // To enable SO_REUSEADDR for the whole program, enable gflag -reuse_addr diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp index e1ca8cc7..45577769 100644 --- a/src/butil/fd_utility.cpp +++ b/src/butil/fd_utility.cpp @@ -25,6 +25,11 @@ namespace butil { +bool is_blocking(int fd) { + const int flags = fcntl(fd, F_GETFL, 0); + return flags >= 0 && !(flags & O_NONBLOCK); +} + int make_non_blocking(int fd) { const int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) { diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h index 8b4f789e..8d93363d 100644 --- a/src/butil/fd_utility.h +++ b/src/butil/fd_utility.h @@ -24,6 +24,9 @@ namespace butil { +// Returns true when fd is blocking, false otherwise. +bool is_blocking(int fd); + // Make file descriptor |fd| non-blocking // Returns 0 on success, -1 otherwise and errno is set (by fcntl) int make_non_blocking(int fd); diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h new file mode 100644 index 00000000..1771683f --- /dev/null +++ b/src/butil/memory/scope_guard.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_SCOPED_GUARD_H +#define BRPC_SCOPED_GUARD_H + +#include <type_traits> + +namespace butil { + +// Whether a no-argument callable returns void. +template<typename T> +struct returns_void_t + : public std::is_same<void, decltype(std::declval<T&&>()())> +{}; + +template<typename Callback, + typename = typename std::enable_if< + returns_void_t<Callback>::value>::type> +class ScopeGuard; + +template<typename Callback> +ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept; + +// ScopeGuard is a simple implementation to guarantee that +// a function is executed upon leaving the current scope. +template<typename Callback> +class ScopeGuard<Callback> { +public: + ScopeGuard(ScopeGuard&& other) noexcept + :_callback(std::move(other._callback)) + , _dismiss(other._dismiss) { + other.dismiss(); + } + + ~ScopeGuard() noexcept { + if(!_dismiss) { + _callback(); + } + } + + void dismiss() noexcept { + _dismiss = true; + } + + ScopeGuard() = delete; + ScopeGuard(const ScopeGuard&) = delete; + ScopeGuard& operator=(const ScopeGuard&) = delete; + ScopeGuard& operator=(ScopeGuard&&) = delete; + +private: +// Only MakeScopeGuard and move constructor can create ScopeGuard. +friend ScopeGuard<Callback> MakeScopeGuard<Callback>(Callback&& callback) noexcept; + + explicit ScopeGuard(Callback&& callback) noexcept + :_callback(std::forward<Callback>(callback)) + , _dismiss(false) {} + +private: + Callback _callback; + bool _dismiss; +}; + +// The MakeScopeGuard() function is used to create a new ScopeGuard object. +// It can be instantiated with a lambda function, a std::function<void()>, +// a functor, or a void(*)() function pointer. +template<typename Callback> +ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept { + return ScopeGuard<Callback>{ std::forward<Callback>(callback)}; +} + +} + +#endif // BRPC_SCOPED_GUARD_H diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp index 5e3acb61..ec94f79f 100644 --- a/test/bthread_fd_unittest.cpp +++ b/test/bthread_fd_unittest.cpp @@ -26,6 +26,8 @@ #include "butil/time.h" #include "butil/macros.h" #include "butil/fd_utility.h" +#include <butil/endpoint.h> +#include <butil/fd_guard.h> #include "butil/logging.h" #include "bthread/task_control.h" #include "bthread/task_group.h" @@ -555,4 +557,41 @@ TEST(FDTest, double_close) { ASSERT_EQ(-1, bthread_close(fds[1])); ASSERT_EQ(ec, errno); } + +const char* g_hostname = "baidu.com"; +TEST(FDTest, bthread_connect) { + butil::EndPoint ep; + ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep)); + + { + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_LE(0, sockfd); + bool is_blocking = butil::is_blocking(sockfd); + ASSERT_LE(0, sockfd); + ASSERT_EQ(0, bthread_connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size)); + ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); + + } + + { + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_LE(0, sockfd); + bool is_blocking = butil::is_blocking(sockfd); + // In most cases, 1 millisecond will result in a connection timeout. + timespec abstime = butil::milliseconds_from_now(1); + const int rc = bthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, &abstime); + ASSERT_EQ(-1, rc); + ASSERT_EQ(ETIMEDOUT, errno); + ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); + } +} + } // namespace diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index 4cb7f757..fcb23a7b 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -16,12 +16,19 @@ // under the License. #include <gtest/gtest.h> +#include <butil/fd_guard.h> +#include <butil/fd_utility.h> #include "butil/errno.h" #include "butil/endpoint.h" #include "butil/logging.h" #include "butil/containers/flat_map.h" #include "butil/details/extended_endpoint.hpp" +namespace butil { +int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, + socklen_t addrlen, const timespec* abstime); +} + namespace { using butil::details::ExtendedEndPoint; @@ -472,4 +479,53 @@ TEST(EndPointTest, endpoint_concurrency) { } } +const char* g_hostname = "baidu.com"; + +TEST(EndPointTest, tcp_connect) { + butil::EndPoint ep; + ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep)); + { + butil::fd_guard sockfd(butil::tcp_connect(ep, NULL)); + ASSERT_LE(0, sockfd); + } + { + butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1000)); + ASSERT_LE(0, sockfd); + } + { + butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1)); + ASSERT_EQ(-1, sockfd); + ASSERT_EQ(ETIMEDOUT, errno); + } + + { + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_LE(0, sockfd); + bool is_blocking = butil::is_blocking(sockfd); + ASSERT_EQ(0, butil::pthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, NULL)); + ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); + } + + { + struct sockaddr_storage serv_addr{}; + socklen_t serv_addr_size = 0; + ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); + butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); + ASSERT_LE(0, sockfd); + bool is_blocking = butil::is_blocking(sockfd); + // In most cases, 1 millisecond will result in a connection timeout. + timespec abstime = butil::milliseconds_from_now(1); + const int rc = butil::pthread_timed_connect( + sockfd, (struct sockaddr*) &serv_addr, + serv_addr_size, &abstime); + ASSERT_EQ(-1, rc); + ASSERT_EQ(ETIMEDOUT, errno); + ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); + } +} + } // end of namespace --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org