This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb0fca7 Fix tcp connect interrupt (#2664)
1bb0fca7 is described below

commit 1bb0fca7527ef580b34aebc72eb43b93d354badd
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jul 29 10:32:26 2024 +0800

    Fix tcp connect interrupt (#2664)
    
    * Fix tcp connect interrupt
    
    * Recalculate timeout
---
 src/bthread/fd.cpp           | 31 +++++-----------
 src/butil/endpoint.cpp       | 88 ++++++++++++++++++++------------------------
 src/butil/fd_utility.cpp     | 51 ++++++++++++++++++++++++-
 src/butil/fd_utility.h       |  5 ++-
 test/bthread_fd_unittest.cpp | 50 +++++++++++++++++++++++++
 test/endpoint_unittest.cpp   | 70 +++++++++++++++++++++++++++++++++++
 test/run_tests.sh            |  2 +-
 7 files changed, 224 insertions(+), 73 deletions(-)

diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp
index e97cee2c..b65dca48 100644
--- a/src/bthread/fd.cpp
+++ b/src/bthread/fd.cpp
@@ -264,9 +264,11 @@ public:
             return -1;
         }
 #endif
-        if (butex_wait(butex, expected_val, abstime) < 0 &&
-            errno != EWOULDBLOCK && errno != EINTR) {
-            return -1;
+        while (butex->load(butil::memory_order_relaxed) == expected_val) {
+            if (butex_wait(butex, expected_val, abstime) < 0 &&
+                errno != EWOULDBLOCK && errno != EINTR) {
+                return -1;
+            }
         }
         return 0;
     }
@@ -496,17 +498,11 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
 #endif
         return -1;
     }
-    int err;
-    socklen_t errlen = sizeof(err);
-    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
-        PLOG(FATAL) << "Fail to getsockopt";
-        return -1;
-    }
-    if (err != 0) {
-        CHECK(err != EINPROGRESS);
-        errno = err;
+
+    if (butil::is_connected(sockfd) != 0) {
         return -1;
     }
+
     return 0;
 }
 
@@ -539,17 +535,10 @@ int bthread_timed_connect(int sockfd, const struct 
sockaddr* serv_addr,
         return -1;
     }
 
-    int err;
-    socklen_t errlen = sizeof(err);
-    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
-        PLOG(FATAL) << "Fail to getsockopt";
-        return -1;
-    }
-    if (err != 0) {
-        CHECK(err != EINPROGRESS);
-        errno = err;
+    if (butil::is_connected(sockfd) != 0) {
         return -1;
     }
+
     return 0;
 }
 
diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp
index cfd536a0..2a7d9e3c 100644
--- a/src/butil/endpoint.cpp
+++ b/src/butil/endpoint.cpp
@@ -61,7 +61,7 @@ int BAIDU_WEAK bthread_timed_connect(
 
 __END_DECLS
 
-#include "details/extended_endpoint.hpp"
+#include "butil/details/extended_endpoint.hpp"
 
 namespace butil {
 
@@ -419,18 +419,6 @@ short kqueue_to_poll_events(int kqueue_events) {
 
 int pthread_fd_wait(int fd, unsigned events,
                     const timespec* abstime) {
-    int diff_ms = -1;
-    if (abstime) {
-        timespec now;
-        clock_gettime(CLOCK_REALTIME, &now);
-        int64_t now_us = butil::timespec_to_microseconds(now);
-        int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
-        if (abstime_us <= now_us) {
-            errno = ETIMEDOUT;
-            return -1;
-        }
-        diff_ms = (abstime_us - now_us + 999L) / 1000L;
-    }
 #if defined(OS_LINUX)
     const short poll_events = epoll_to_poll_events(events);
 #elif defined(OS_MACOSX)
@@ -441,13 +429,32 @@ int pthread_fd_wait(int fd, unsigned events,
         return -1;
     }
     pollfd ufds = { fd, poll_events, 0 };
-    const int rc = poll(&ufds, 1, diff_ms);
-    if (rc < 0) {
-        return -1;
-    }
-    if (rc == 0) {
-        errno = ETIMEDOUT;
-        return -1;
+    int64_t abstime_us = -1;
+    if (NULL != abstime) {
+        abstime_us = butil::timespec_to_microseconds(*abstime);
+    }
+    while (true) {
+        int diff_ms = -1;
+        if (NULL != abstime) {
+            int64_t now_us = butil::gettimeofday_us();
+            if (abstime_us <= now_us) {
+                errno = ETIMEDOUT;
+                return -1;
+            }
+            diff_ms = (abstime_us - now_us + 999L) / 1000L;
+        }
+        int rc = poll(&ufds, 1, diff_ms);
+        if (rc > 0) {
+            break;
+        } else if (rc == 0) {
+            errno = ETIMEDOUT;
+            return -1;
+        } else {
+            if (errno == EINTR) {
+                continue;
+            }
+            return -1;
+        }
     }
     if (ufds.revents & POLLNVAL) {
         errno = EBADF;
@@ -458,10 +465,6 @@ int pthread_fd_wait(int fd, unsigned events,
 
 int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
                           socklen_t addrlen, const timespec* abstime) {
-    if (abstime == NULL) {
-        return ::connect(sockfd, serv_addr, addrlen);
-    }
-
     bool is_blocking = butil::is_blocking(sockfd);
     if (is_blocking) {
         butil::make_non_blocking(sockfd);
@@ -485,15 +488,7 @@ int pthread_timed_connect(int sockfd, const struct 
sockaddr* serv_addr,
         return -1;
     }
 
-    int err;
-    socklen_t errlen = sizeof(err);
-    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
-        PLOG(FATAL) << "Fail to getsockopt";
-        return -1;
-    }
-    if (err != 0) {
-        CHECK(err != EINPROGRESS);
-        errno = err;
+    if (is_connected(sockfd) != 0) {
         return -1;
     }
     return 0;
@@ -513,22 +508,19 @@ int tcp_connect(const EndPoint& server, int* self_port, 
int connect_timeout_ms)
     if (sockfd < 0) {
         return -1;
     }
-    int rc = 0;
-    if (connect_timeout_ms <= 0) {
-        if (bthread_connect != NULL) {
-            rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr, 
serv_addr_size);
-        } else {
-            rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, 
serv_addr_size);
-        }
+    timespec abstime{};
+    timespec* abstime_ptr = NULL;
+    if (connect_timeout_ms > 0) {
+        abstime = butil::milliseconds_from_now(connect_timeout_ms);
+        abstime_ptr = &abstime;
+    }
+    int rc;
+    if (bthread_timed_connect != NULL) {
+        rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
+                                   serv_addr_size, abstime_ptr);
     } else {
-        timespec abstime = butil::milliseconds_from_now(connect_timeout_ms);
-        if (bthread_timed_connect != NULL) {
-            rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
-                                       serv_addr_size, &abstime);
-        } else {
-            rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
-                                       serv_addr_size, &abstime);
-        }
+        rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
+                                   serv_addr_size, abstime_ptr);
     }
     if (rc < 0) {
         return -1;
diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp
index 45577769..52410f8c 100644
--- a/src/butil/fd_utility.cpp
+++ b/src/butil/fd_utility.cpp
@@ -17,11 +17,17 @@
 
 // Date: Mon. Nov 7 14:47:36 CST 2011
 
+#include "butil/build_config.h"
 #include <fcntl.h>                   // fcntl()
 #include <netinet/in.h>              // IPPROTO_TCP
 #include <sys/types.h>
 #include <sys/socket.h>              // setsockopt
 #include <netinet/tcp.h>             // TCP_NODELAY
+#include <netinet/tcp.h>
+#if defined(OS_MACOSX)
+#include <netinet/tcp_fsm.h>        // TCPS_ESTABLISHED, TCP6S_ESTABLISHED
+#endif
+#include "butil/logging.h"
 
 namespace butil {
 
@@ -56,9 +62,50 @@ int make_close_on_exec(int fd) {
     return fcntl(fd, F_SETFD, FD_CLOEXEC);
 }
 
-int make_no_delay(int socket) {
+int make_no_delay(int sockfd) {
     int flag = 1;
-    return setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, 
sizeof(flag));
+    return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, 
sizeof(flag));
+}
+
+int is_connected(int sockfd) {
+    errno = 0;
+    int err;
+    socklen_t errlen = sizeof(err);
+    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+        PLOG(FATAL) << "Fail to getsockopt";
+        return -1;
+    }
+    if (err != 0) {
+        errno = err;
+        return -1;
+    }
+
+#if defined(OS_LINUX)
+    struct tcp_info ti{};
+    socklen_t len = sizeof(ti);
+    if(getsockopt(sockfd, SOL_TCP, TCP_INFO, &ti, &len) < 0) {
+        PLOG(FATAL) << "Fail to getsockopt";
+        return -1;
+    }
+    if (ti.tcpi_state != TCP_ESTABLISHED) {
+        errno = ENOTCONN;
+        return -1;
+    }
+#elif defined(OS_MACOSX)
+    struct tcp_connection_info ti{};
+    socklen_t len = sizeof(ti);
+    if (getsockopt(sockfd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) < 0) {
+        PLOG(FATAL) << "Fail to getsockopt";
+        return -1;
+    }
+    if (ti.tcpi_state != TCPS_ESTABLISHED &&
+        ti.tcpi_state != TCP6S_ESTABLISHED) {
+        errno = ENOTCONN;
+        return -1;
+    }
+#endif
+
+    return 0;
 }
 
 }  // namespace butil
diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h
index 8d93363d..5c920eb3 100644
--- a/src/butil/fd_utility.h
+++ b/src/butil/fd_utility.h
@@ -41,7 +41,10 @@ int make_close_on_exec(int fd);
 
 // Disable nagling on file descriptor |socket|.
 // Returns 0 on success, -1 when error and errno is set (by setsockopt)
-int make_no_delay(int socket);
+int make_no_delay(int sockfd);
+
+// Return true if the socket is connected.
+int is_connected(int sockfd);
 
 }  // namespace butil
 
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index ec94f79f..ec5df536 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -34,9 +34,11 @@
 #include "bthread/interrupt_pthread.h"
 #include "bthread/bthread.h"
 #include "bthread/unstable.h"
+#include <netinet/tcp.h>
 #if defined(OS_MACOSX)
 #include <sys/types.h>                           // struct kevent
 #include <sys/event.h>                           // kevent(), kqueue()
+#include <netinet/tcp_fsm.h>
 #endif
 
 #ifndef NDEBUG
@@ -594,4 +596,52 @@ TEST(FDTest, bthread_connect) {
     }
 }
 
+void TestConnectInterruptImpl(bool timed) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+    struct sockaddr_storage serv_addr{};
+    socklen_t serv_addr_size = 0;
+    ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+    butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+    ASSERT_GE(sockfd, 0);
+
+    int rc;
+    if (timed) {
+        int64_t start_ms = butil::cpuwide_time_ms();
+        butil::tcp_connect(ep, NULL);
+        int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
+        LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
+
+        timespec abstime = butil::milliseconds_from_now(connect_ms + 100);
+        rc = bthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, &abstime);
+    } else {
+        rc = bthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, NULL);
+    }
+    ASSERT_EQ(0, rc) << "errno=" << errno;
+    ASSERT_EQ(0, butil::is_connected(sockfd));
+
+}
+
+void* ConnectThread(void* arg) {
+    bool timed = *(bool*)arg;
+    TestConnectInterruptImpl(timed);
+    return NULL;
+}
+
+void TestConnectInterrupt(bool timed) {
+    bthread_t tid;
+    ASSERT_EQ(0, bthread_start_background(&tid, NULL, ConnectThread, &timed));
+    ASSERT_EQ(0, bthread_stop(tid));
+    ASSERT_EQ(0, bthread_join(tid, NULL));
+}
+
+TEST(FDTest, interrupt) {
+    TestConnectInterrupt(false);
+    TestConnectInterrupt(true);
+}
+
 } // namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 14d150a7..afece46e 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -23,6 +23,10 @@
 #include "butil/logging.h"
 #include "butil/containers/flat_map.h"
 #include "butil/details/extended_endpoint.hpp"
+#include <netinet/tcp.h>
+#if defined(OS_MACOSX)
+#include <netinet/tcp_fsm.h>
+#endif
 
 namespace butil {
 int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
@@ -528,4 +532,70 @@ TEST(EndPointTest, tcp_connect) {
     }
 }
 
+bool g_connect_startd = false;
+
+void TestConnectInterruptImpl(bool timed) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+
+    struct sockaddr_storage serv_addr{};
+    socklen_t serv_addr_size = 0;
+    ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+    butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+    ASSERT_LE(0, sockfd);
+
+    int rc;
+    if (timed) {
+        int64_t start_ms = butil::cpuwide_time_ms();
+        butil::tcp_connect(ep, NULL);
+        int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
+        LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
+
+        timespec abstime = butil::milliseconds_from_now(connect_ms + 1);
+        rc = butil::pthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, &abstime);
+    } else {
+        rc = butil::pthread_timed_connect(
+            sockfd, (struct sockaddr*) &serv_addr,
+            serv_addr_size, NULL);
+    }
+    ASSERT_EQ(0, rc) << "errno=" << errno;
+    ASSERT_EQ(0, butil::is_connected(sockfd));
+}
+
+void* ConnectThread(void* arg) {
+    bool timed = *(bool*)arg;
+    TestConnectInterruptImpl(timed);
+    return NULL;
+}
+
+void sig_handler(int sig) {
+    LOG(INFO) << "sig=" << sig;
+}
+
+void register_sigurg() {
+    signal(SIGURG, sig_handler);
+}
+
+void TestConnectInterrupt(bool timed) {
+    g_connect_startd = false;
+    pthread_t tid;
+    ASSERT_EQ(0, pthread_create(&tid, NULL, ConnectThread, &timed));
+
+    while (g_connect_startd) {
+        usleep(1000);
+    }
+
+    ASSERT_EQ(0, pthread_kill(tid, SIGURG));
+
+    pthread_join(tid, NULL);
+}
+
+TEST(EndPointTest, interrupt) {
+    register_sigurg();
+    TestConnectInterrupt(false);
+    TestConnectInterrupt(true);
+}
+
 } // end of namespace
diff --git a/test/run_tests.sh b/test/run_tests.sh
index ebd64840..14297797 100755
--- a/test/run_tests.sh
+++ b/test/run_tests.sh
@@ -43,7 +43,7 @@ print_bt () {
     COREFILE=$(find . -name "core*" -type f -printf "%T@ %p\n" | sort -k 1 -n 
| cut -d' ' -f 2- | tail -n 1)
     if [ ! -z "$COREFILE" ]; then
         >&2 echo "corefile=$COREFILE prog=$1"
-        gdb -c "$COREFILE" $1 -ex "thread apply all bt" -ex "set pagination 0" 
-batch;
+        gdb -c "$COREFILE" $1 -ex "bt" -ex "thread apply all bt" -ex "set 
pagination 0" -batch;
     fi
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to