This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb0fca7 Fix tcp connect interrupt (#2664)
1bb0fca7 is described below
commit 1bb0fca7527ef580b34aebc72eb43b93d354badd
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jul 29 10:32:26 2024 +0800
Fix tcp connect interrupt (#2664)
* Fix tcp connect interrupt
* Recalculate timeout
---
src/bthread/fd.cpp | 31 +++++-----------
src/butil/endpoint.cpp | 88 ++++++++++++++++++++------------------------
src/butil/fd_utility.cpp | 51 ++++++++++++++++++++++++-
src/butil/fd_utility.h | 5 ++-
test/bthread_fd_unittest.cpp | 50 +++++++++++++++++++++++++
test/endpoint_unittest.cpp | 70 +++++++++++++++++++++++++++++++++++
test/run_tests.sh | 2 +-
7 files changed, 224 insertions(+), 73 deletions(-)
diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp
index e97cee2c..b65dca48 100644
--- a/src/bthread/fd.cpp
+++ b/src/bthread/fd.cpp
@@ -264,9 +264,11 @@ public:
return -1;
}
#endif
- if (butex_wait(butex, expected_val, abstime) < 0 &&
- errno != EWOULDBLOCK && errno != EINTR) {
- return -1;
+ while (butex->load(butil::memory_order_relaxed) == expected_val) {
+ if (butex_wait(butex, expected_val, abstime) < 0 &&
+ errno != EWOULDBLOCK && errno != EINTR) {
+ return -1;
+ }
}
return 0;
}
@@ -496,17 +498,11 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
#endif
return -1;
}
- int err;
- socklen_t errlen = sizeof(err);
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
- PLOG(FATAL) << "Fail to getsockopt";
- return -1;
- }
- if (err != 0) {
- CHECK(err != EINPROGRESS);
- errno = err;
+
+ if (butil::is_connected(sockfd) != 0) {
return -1;
}
+
return 0;
}
@@ -539,17 +535,10 @@ int bthread_timed_connect(int sockfd, const struct
sockaddr* serv_addr,
return -1;
}
- int err;
- socklen_t errlen = sizeof(err);
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
- PLOG(FATAL) << "Fail to getsockopt";
- return -1;
- }
- if (err != 0) {
- CHECK(err != EINPROGRESS);
- errno = err;
+ if (butil::is_connected(sockfd) != 0) {
return -1;
}
+
return 0;
}
diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp
index cfd536a0..2a7d9e3c 100644
--- a/src/butil/endpoint.cpp
+++ b/src/butil/endpoint.cpp
@@ -61,7 +61,7 @@ int BAIDU_WEAK bthread_timed_connect(
__END_DECLS
-#include "details/extended_endpoint.hpp"
+#include "butil/details/extended_endpoint.hpp"
namespace butil {
@@ -419,18 +419,6 @@ short kqueue_to_poll_events(int kqueue_events) {
int pthread_fd_wait(int fd, unsigned events,
const timespec* abstime) {
- int diff_ms = -1;
- if (abstime) {
- timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
- int64_t now_us = butil::timespec_to_microseconds(now);
- int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
- if (abstime_us <= now_us) {
- errno = ETIMEDOUT;
- return -1;
- }
- diff_ms = (abstime_us - now_us + 999L) / 1000L;
- }
#if defined(OS_LINUX)
const short poll_events = epoll_to_poll_events(events);
#elif defined(OS_MACOSX)
@@ -441,13 +429,32 @@ int pthread_fd_wait(int fd, unsigned events,
return -1;
}
pollfd ufds = { fd, poll_events, 0 };
- const int rc = poll(&ufds, 1, diff_ms);
- if (rc < 0) {
- return -1;
- }
- if (rc == 0) {
- errno = ETIMEDOUT;
- return -1;
+ int64_t abstime_us = -1;
+ if (NULL != abstime) {
+ abstime_us = butil::timespec_to_microseconds(*abstime);
+ }
+ while (true) {
+ int diff_ms = -1;
+ if (NULL != abstime) {
+ int64_t now_us = butil::gettimeofday_us();
+ if (abstime_us <= now_us) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+ diff_ms = (abstime_us - now_us + 999L) / 1000L;
+ }
+ int rc = poll(&ufds, 1, diff_ms);
+ if (rc > 0) {
+ break;
+ } else if (rc == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ } else {
+ if (errno == EINTR) {
+ continue;
+ }
+ return -1;
+ }
}
if (ufds.revents & POLLNVAL) {
errno = EBADF;
@@ -458,10 +465,6 @@ int pthread_fd_wait(int fd, unsigned events,
int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
socklen_t addrlen, const timespec* abstime) {
- if (abstime == NULL) {
- return ::connect(sockfd, serv_addr, addrlen);
- }
-
bool is_blocking = butil::is_blocking(sockfd);
if (is_blocking) {
butil::make_non_blocking(sockfd);
@@ -485,15 +488,7 @@ int pthread_timed_connect(int sockfd, const struct
sockaddr* serv_addr,
return -1;
}
- int err;
- socklen_t errlen = sizeof(err);
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
- PLOG(FATAL) << "Fail to getsockopt";
- return -1;
- }
- if (err != 0) {
- CHECK(err != EINPROGRESS);
- errno = err;
+ if (is_connected(sockfd) != 0) {
return -1;
}
return 0;
@@ -513,22 +508,19 @@ int tcp_connect(const EndPoint& server, int* self_port,
int connect_timeout_ms)
if (sockfd < 0) {
return -1;
}
- int rc = 0;
- if (connect_timeout_ms <= 0) {
- if (bthread_connect != NULL) {
- rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr,
serv_addr_size);
- } else {
- rc = ::connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size);
- }
+ timespec abstime{};
+ timespec* abstime_ptr = NULL;
+ if (connect_timeout_ms > 0) {
+ abstime = butil::milliseconds_from_now(connect_timeout_ms);
+ abstime_ptr = &abstime;
+ }
+ int rc;
+ if (bthread_timed_connect != NULL) {
+ rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
+ serv_addr_size, abstime_ptr);
} else {
- timespec abstime = butil::milliseconds_from_now(connect_timeout_ms);
- if (bthread_timed_connect != NULL) {
- rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
- serv_addr_size, &abstime);
- } else {
- rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
- serv_addr_size, &abstime);
- }
+ rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, abstime_ptr);
}
if (rc < 0) {
return -1;
diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp
index 45577769..52410f8c 100644
--- a/src/butil/fd_utility.cpp
+++ b/src/butil/fd_utility.cpp
@@ -17,11 +17,17 @@
// Date: Mon. Nov 7 14:47:36 CST 2011
+#include "butil/build_config.h"
#include <fcntl.h> // fcntl()
#include <netinet/in.h> // IPPROTO_TCP
#include <sys/types.h>
#include <sys/socket.h> // setsockopt
#include <netinet/tcp.h> // TCP_NODELAY
+#include <netinet/tcp.h>
+#if defined(OS_MACOSX)
+#include <netinet/tcp_fsm.h> // TCPS_ESTABLISHED, TCP6S_ESTABLISHED
+#endif
+#include "butil/logging.h"
namespace butil {
@@ -56,9 +62,50 @@ int make_close_on_exec(int fd) {
return fcntl(fd, F_SETFD, FD_CLOEXEC);
}
-int make_no_delay(int socket) {
+int make_no_delay(int sockfd) {
int flag = 1;
- return setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (char*)&flag,
sizeof(flag));
+ return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag,
sizeof(flag));
+}
+
+int is_connected(int sockfd) {
+ errno = 0;
+ int err;
+ socklen_t errlen = sizeof(err);
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+ PLOG(FATAL) << "Fail to getsockopt";
+ return -1;
+ }
+ if (err != 0) {
+ errno = err;
+ return -1;
+ }
+
+#if defined(OS_LINUX)
+ struct tcp_info ti{};
+ socklen_t len = sizeof(ti);
+ if(getsockopt(sockfd, SOL_TCP, TCP_INFO, &ti, &len) < 0) {
+ PLOG(FATAL) << "Fail to getsockopt";
+ return -1;
+ }
+ if (ti.tcpi_state != TCP_ESTABLISHED) {
+ errno = ENOTCONN;
+ return -1;
+ }
+#elif defined(OS_MACOSX)
+ struct tcp_connection_info ti{};
+ socklen_t len = sizeof(ti);
+ if (getsockopt(sockfd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) < 0) {
+ PLOG(FATAL) << "Fail to getsockopt";
+ return -1;
+ }
+ if (ti.tcpi_state != TCPS_ESTABLISHED &&
+ ti.tcpi_state != TCP6S_ESTABLISHED) {
+ errno = ENOTCONN;
+ return -1;
+ }
+#endif
+
+ return 0;
}
} // namespace butil
diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h
index 8d93363d..5c920eb3 100644
--- a/src/butil/fd_utility.h
+++ b/src/butil/fd_utility.h
@@ -41,7 +41,10 @@ int make_close_on_exec(int fd);
// Disable nagling on file descriptor |socket|.
// Returns 0 on success, -1 when error and errno is set (by setsockopt)
-int make_no_delay(int socket);
+int make_no_delay(int sockfd);
+
+// Return true if the socket is connected.
+int is_connected(int sockfd);
} // namespace butil
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index ec94f79f..ec5df536 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -34,9 +34,11 @@
#include "bthread/interrupt_pthread.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
+#include <netinet/tcp.h>
#if defined(OS_MACOSX)
#include <sys/types.h> // struct kevent
#include <sys/event.h> // kevent(), kqueue()
+#include <netinet/tcp_fsm.h>
#endif
#ifndef NDEBUG
@@ -594,4 +596,52 @@ TEST(FDTest, bthread_connect) {
}
}
+void TestConnectInterruptImpl(bool timed) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_GE(sockfd, 0);
+
+ int rc;
+ if (timed) {
+ int64_t start_ms = butil::cpuwide_time_ms();
+ butil::tcp_connect(ep, NULL);
+ int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
+ LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
+
+ timespec abstime = butil::milliseconds_from_now(connect_ms + 100);
+ rc = bthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, &abstime);
+ } else {
+ rc = bthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, NULL);
+ }
+ ASSERT_EQ(0, rc) << "errno=" << errno;
+ ASSERT_EQ(0, butil::is_connected(sockfd));
+
+}
+
+void* ConnectThread(void* arg) {
+ bool timed = *(bool*)arg;
+ TestConnectInterruptImpl(timed);
+ return NULL;
+}
+
+void TestConnectInterrupt(bool timed) {
+ bthread_t tid;
+ ASSERT_EQ(0, bthread_start_background(&tid, NULL, ConnectThread, &timed));
+ ASSERT_EQ(0, bthread_stop(tid));
+ ASSERT_EQ(0, bthread_join(tid, NULL));
+}
+
+TEST(FDTest, interrupt) {
+ TestConnectInterrupt(false);
+ TestConnectInterrupt(true);
+}
+
} // namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 14d150a7..afece46e 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -23,6 +23,10 @@
#include "butil/logging.h"
#include "butil/containers/flat_map.h"
#include "butil/details/extended_endpoint.hpp"
+#include <netinet/tcp.h>
+#if defined(OS_MACOSX)
+#include <netinet/tcp_fsm.h>
+#endif
namespace butil {
int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
@@ -528,4 +532,70 @@ TEST(EndPointTest, tcp_connect) {
}
}
+bool g_connect_startd = false;
+
+void TestConnectInterruptImpl(bool timed) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_LE(0, sockfd);
+
+ int rc;
+ if (timed) {
+ int64_t start_ms = butil::cpuwide_time_ms();
+ butil::tcp_connect(ep, NULL);
+ int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
+ LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
+
+ timespec abstime = butil::milliseconds_from_now(connect_ms + 1);
+ rc = butil::pthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, &abstime);
+ } else {
+ rc = butil::pthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, NULL);
+ }
+ ASSERT_EQ(0, rc) << "errno=" << errno;
+ ASSERT_EQ(0, butil::is_connected(sockfd));
+}
+
+void* ConnectThread(void* arg) {
+ bool timed = *(bool*)arg;
+ TestConnectInterruptImpl(timed);
+ return NULL;
+}
+
+void sig_handler(int sig) {
+ LOG(INFO) << "sig=" << sig;
+}
+
+void register_sigurg() {
+ signal(SIGURG, sig_handler);
+}
+
+void TestConnectInterrupt(bool timed) {
+ g_connect_startd = false;
+ pthread_t tid;
+ ASSERT_EQ(0, pthread_create(&tid, NULL, ConnectThread, &timed));
+
+ while (g_connect_startd) {
+ usleep(1000);
+ }
+
+ ASSERT_EQ(0, pthread_kill(tid, SIGURG));
+
+ pthread_join(tid, NULL);
+}
+
+TEST(EndPointTest, interrupt) {
+ register_sigurg();
+ TestConnectInterrupt(false);
+ TestConnectInterrupt(true);
+}
+
} // end of namespace
diff --git a/test/run_tests.sh b/test/run_tests.sh
index ebd64840..14297797 100755
--- a/test/run_tests.sh
+++ b/test/run_tests.sh
@@ -43,7 +43,7 @@ print_bt () {
COREFILE=$(find . -name "core*" -type f -printf "%T@ %p\n" | sort -k 1 -n
| cut -d' ' -f 2- | tail -n 1)
if [ ! -z "$COREFILE" ]; then
>&2 echo "corefile=$COREFILE prog=$1"
- gdb -c "$COREFILE" $1 -ex "thread apply all bt" -ex "set pagination 0"
-batch;
+ gdb -c "$COREFILE" $1 -ex "bt" -ex "thread apply all bt" -ex "set
pagination 0" -batch;
fi
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]