This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 9680d31e Support timedlock of fast/hook pthread and bthread::Mutex 
(#2760)
9680d31e is described below

commit 9680d31edb0f92fcfe37fca44b251a150e167fd8
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Wed Oct 9 12:36:00 2024 +0800

    Support timedlock of fast/hook pthread and bthread::Mutex (#2760)
    
    * Support timedlock of fast/hook pthread and bthread::Mutex
    
    * Disable bthread sche safety debug by default
---
 .github/workflows/ci-linux.yml   |  12 +--
 BUILD.bazel                      |   3 +
 CMakeLists.txt                   |   8 +-
 bazel/config/BUILD.bazel         |   6 ++
 config_brpc.sh                   |   6 +-
 src/bthread/butex.cpp            |   5 --
 src/bthread/butex.h              |   5 ++
 src/bthread/mutex.cpp            | 173 +++++++++++++++++++++++++++++++--------
 src/bthread/mutex.h              |  12 ++-
 src/butil/synchronization/lock.h |  13 ++-
 test/bthread_mutex_unittest.cpp  |  62 ++++++++++++++
 11 files changed, 254 insertions(+), 51 deletions(-)

diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml
index 892ce978..99c0bd1b 100644
--- a/.github/workflows/ci-linux.yml
+++ b/.github/workflows/ci-linux.yml
@@ -61,7 +61,7 @@ jobs:
     - uses: ./.github/actions/install-all-dependences
     - uses: ./.github/actions/init-make-config
       with:
-       options: --cc=gcc --cxx=g++ --with-thrift --with-glog --with-rdma
+       options: --cc=gcc --cxx=g++ --with-thrift --with-glog --with-rdma  
--with-debug-bthread-sche-safety
     - name: compile
       run: |
         make -j ${{env.proc_num}}
@@ -76,7 +76,7 @@ jobs:
            export CC=gcc && export CXX=g++
            mkdir build
            cd build
-           cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON 
-DWITH_RDMA=ON ..
+           cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON 
-DWITH_RDMA=ON -DWITH_DEBUG_BTHREAD_SCHE_SAFETY=ON ..
     - name: compile
       run: |
            cd build
@@ -86,7 +86,7 @@ jobs:
     runs-on: ubuntu-20.04
     steps:
     - uses: actions/checkout@v2
-    - run: bazel test --verbose_failures --define with_mesalink=false --define 
with_glog=true --define with_thrift=true -- //... -//example/...
+    - run: bazel test --verbose_failures --define with_mesalink=false --define 
with_glog=true --define with_thrift=true --define 
with_debug_bthread_sche_safety=true -- //... -//example/...
 
   clang-compile-with-make:
     runs-on: ubuntu-20.04
@@ -135,7 +135,7 @@ jobs:
     - uses: ./.github/actions/install-all-dependences
     - uses: ./.github/actions/init-make-config
       with:
-        options: --cc=clang --cxx=clang++ --with-thrift --with-glog --with-rdma
+        options: --cc=clang --cxx=clang++ --with-thrift --with-glog 
--with-rdma  --with-debug-bthread-sche-safety
     - name: compile
       run: |
            make -j ${{env.proc_num}}
@@ -150,7 +150,7 @@ jobs:
            export CC=clang && export CXX=clang++
            mkdir build
            cd build
-           cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON 
-DWITH_RDMA=ON ..
+           cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON 
-DWITH_RDMA=ON -DWITH_DEBUG_BTHREAD_SCHE_SAFETY=ON ..
     - name: compile
       run: |
            cd build
@@ -160,7 +160,7 @@ jobs:
     runs-on: ubuntu-20.04
     steps:
     - uses: actions/checkout@v2
-    - run: bazel build --verbose_failures --action_env=CC=clang-12 --define 
with_mesalink=false --define with_glog=true --define with_thrift=true -- //... 
-//example/...
+    - run: bazel build --verbose_failures --action_env=CC=clang-12 --define 
with_mesalink=false --define with_glog=true --define with_thrift=true --define 
with_debug_bthread_sche_safety=true -- //... -//example/...
 
   clang-unittest:
     runs-on: ubuntu-20.04
diff --git a/BUILD.bazel b/BUILD.bazel
index 905dca16..19d04a93 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -45,6 +45,9 @@ COPTS = [
 }) + select({
     "//bazel/config:brpc_with_rdma": ["-DBRPC_WITH_RDMA=1"],
     "//conditions:default": [""],
+}) + select({
+    "//bazel/config:brpc_with_debug_bthread_sche_safety": 
["-DBRPC_DEBUG_BTHREAD_SCHE_SAFETY=1"],
+    "//conditions:default": ["-DBRPC_DEBUG_BTHREAD_SCHE_SAFETY=0"],
 })
 
 LINKOPTS = [
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9b354d42..c8fa7716 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -26,6 +26,7 @@ option(WITH_DEBUG_SYMBOLS "With debug symbols" ON)
 option(WITH_THRIFT "With thrift framed protocol supported" OFF)
 option(WITH_SNAPPY "With snappy" OFF)
 option(WITH_RDMA "With RDMA" OFF)
+option(WITH_DEBUG_BTHREAD_SCHE_SAFETY "With debugging bthread sche safety" OFF)
 option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF)
 option(BUILD_FUZZ_TESTS "Whether to build fuzz tests" OFF)
 option(BUILD_BRPC_TOOLS "Whether to build brpc tools" ON)
@@ -79,6 +80,11 @@ if(WITH_RDMA)
     set(WITH_RDMA_VAL "1")
 endif()
 
+set(WITH_DEBUG_BTHREAD_SCHE_SAFETY_VAL "0")
+if(WITH_DEBUG_BTHREAD_SCHE_SAFETY)
+    set(WITH_DEBUG_BTHREAD_SCHE_SAFETY_VAL "1")
+endif()
+
 include(GNUInstallDirs)
 
 configure_file(${PROJECT_SOURCE_DIR}/config.h.in 
${PROJECT_SOURCE_DIR}/src/butil/config.h @ONLY)
@@ -117,7 +123,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
     set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -Wno-deprecated-declarations 
-Wno-inconsistent-missing-override")
 endif()
 
-set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} ${DEFINE_CLOCK_GETTIME} 
-DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DBRPC_WITH_RDMA=${WITH_RDMA_VAL} 
-DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} ${DEFINE_CLOCK_GETTIME} 
-DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DBRPC_WITH_RDMA=${WITH_RDMA_VAL} 
-DGFLAGS_NS=${GFLAGS_NS} 
-DBRPC_DEBUG_BTHREAD_SCHE_SAFETY=${WITH_DEBUG_BTHREAD_SCHE_SAFETY_VAL}")
 if(WITH_MESALINK)
     set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DUSE_MESALINK")
 endif()
diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel
index bed04d3b..d7c6d533 100644
--- a/bazel/config/BUILD.bazel
+++ b/bazel/config/BUILD.bazel
@@ -108,4 +108,10 @@ config_setting(
     name = "brpc_with_boringssl",
     define_values = {"BRPC_WITH_BORINGSSL": "true"},
     visibility = ["//visibility:public"],
+)
+
+config_setting(
+    name = "brpc_with_debug_bthread_sche_safety",
+    define_values = {"with_debug_bthread_sche_safety": "true"},
+    visibility = ["//visibility:public"],
 )
\ No newline at end of file
diff --git a/config_brpc.sh b/config_brpc.sh
index cf4544ed..9b6b188c 100755
--- a/config_brpc.sh
+++ b/config_brpc.sh
@@ -38,11 +38,12 @@ else
     LDD=ldd
 fi
 
-TEMP=`getopt -o v: --long 
headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-mesalink,nodebugsymbols
 -n 'config_brpc' -- "$@"`
+TEMP=`getopt -o v: --long 
headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-mesalink,with-debug-bthread-sche-safety,nodebugsymbols
 -n 'config_brpc' -- "$@"`
 WITH_GLOG=0
 WITH_THRIFT=0
 WITH_RDMA=0
 WITH_MESALINK=0
+BRPC_DEBUG_BTHREAD_SCHE_SAFETY=0
 DEBUGSYMBOLS=-g
 
 if [ $? != 0 ] ; then >&2 $ECHO "Terminating..."; exit 1 ; fi
@@ -67,6 +68,7 @@ while true; do
         --with-thrift) WITH_THRIFT=1; shift 1 ;;
         --with-rdma) WITH_RDMA=1; shift 1 ;;
         --with-mesalink) WITH_MESALINK=1; shift 1 ;;
+        --with-debug-bthread-sche-safety ) BRPC_DEBUG_BTHREAD_SCHE_SAFETY=1; 
shift 1 ;;
         --nodebugsymbols ) DEBUGSYMBOLS=; shift 1 ;;
         -- ) shift; break ;;
         * ) break ;;
@@ -407,7 +409,7 @@ append_to_output "STATIC_LINKINGS=$STATIC_LINKINGS"
 append_to_output "DYNAMIC_LINKINGS=$DYNAMIC_LINKINGS"
 
 # CPP means C PreProcessing, not C PlusPlus
-CPPFLAGS="-DBRPC_WITH_GLOG=$WITH_GLOG -DGFLAGS_NS=$GFLAGS_NS"
+CPPFLAGS="-DBRPC_WITH_GLOG=$WITH_GLOG -DGFLAGS_NS=$GFLAGS_NS 
-DBRPC_DEBUG_BTHREAD_SCHE_SAFETY=$BRPC_DEBUG_BTHREAD_SCHE_SAFETY"
 
 # Avoid over-optimizations of TLS variables by GCC>=4.8
 # See: https://github.com/apache/brpc/issues/1693
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 4a0f9c37..1cc8923a 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -68,11 +68,6 @@ inline bvar::Adder<int64_t>& butex_waiter_count() {
 }
 #endif
 
-// If a thread would suspend for less than so many microseconds, return
-// ETIMEDOUT directly.
-// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
-static const int64_t MIN_SLEEP_US = 2; 
-
 enum WaiterState {
     WAITER_STATE_NONE,
     WAITER_STATE_READY,
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index 2786ef68..bf86611e 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -29,6 +29,11 @@
 
 namespace bthread {
 
+// If a thread would suspend for less than so many microseconds, return
+// ETIMEDOUT directly.
+// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
+static const int64_t MIN_SLEEP_US = 2;
+
 // Create a butex which is a futex-like 32-bit primitive for synchronizing
 // bthreads/pthreads.
 // Returns a pointer to 32-bit data, NULL on failure.
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index fa2f91c6..30872561 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -19,6 +19,7 @@
 
 // Date: Sun Aug  3 12:46:15 CST 2014
 
+#include <sys/cdefs.h>
 #include <pthread.h>
 #include <dlfcn.h>                               // dlsym
 #include <fcntl.h>                               // O_RDONLY
@@ -47,9 +48,9 @@
 #include "bthread/processor.h"
 #include "bthread/task_group.h"
 
-extern "C" {
+__BEGIN_DECLS
 extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* 
caller);
-}
+__END_DECLS
 
 namespace bthread {
 
@@ -389,6 +390,13 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex);
 static MutexOp sys_pthread_mutex_lock = first_sys_pthread_mutex_lock;
 static MutexOp sys_pthread_mutex_trylock = first_sys_pthread_mutex_trylock;
 static MutexOp sys_pthread_mutex_unlock = first_sys_pthread_mutex_unlock;
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+typedef int (*TimedMutexOp)(pthread_mutex_t*, const struct timespec*);
+int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
+                                      const struct timespec* __abstime);
+static TimedMutexOp sys_pthread_mutex_timedlock = 
first_sys_pthread_mutex_timedlock;
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
+
 static pthread_once_t init_sys_mutex_lock_once = PTHREAD_ONCE_INIT;
 
 // dlsym may call malloc to allocate space for dlerror and causes contention
@@ -436,11 +444,18 @@ static void init_sys_mutex_lock() {
             RTLD_NEXT, "pthread_mutex_unlock", (void*)init_sys_mutex_lock);
         sys_pthread_mutex_trylock = (MutexOp)_dl_sym(
             RTLD_NEXT, "pthread_mutex_trylock", (void*)init_sys_mutex_lock);
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+        sys_pthread_mutex_timedlock = (TimedMutexOp)_dl_sym(
+            RTLD_NEXT, "pthread_mutex_timedlock", (void*)init_sys_mutex_lock);
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
     } else {
         // _dl_sym may be undefined reference in some system, fallback to dlsym
         sys_pthread_mutex_lock = (MutexOp)dlsym(RTLD_NEXT, 
"pthread_mutex_lock");
         sys_pthread_mutex_unlock = (MutexOp)dlsym(RTLD_NEXT, 
"pthread_mutex_unlock");
         sys_pthread_mutex_trylock = (MutexOp)dlsym(RTLD_NEXT, 
"pthread_mutex_trylock");
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+        sys_pthread_mutex_timedlock = (TimedMutexOp)dlsym(RTLD_NEXT, 
"pthread_mutex_timedlock");
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
     }
 #elif defined(OS_MACOSX)
     // TODO: look workaround for dlsym on mac
@@ -463,6 +478,14 @@ int first_sys_pthread_mutex_trylock(pthread_mutex_t* 
mutex) {
     return sys_pthread_mutex_trylock(mutex);
 }
 
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+int first_sys_pthread_mutex_timedlock(pthread_mutex_t* mutex,
+                                      const struct timespec* abstime) {
+    pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
+    return sys_pthread_mutex_timedlock(mutex, abstime);
+}
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
+
 int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
     pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
     return sys_pthread_mutex_unlock(mutex);
@@ -482,11 +505,17 @@ static __thread bool tls_inside_lock = false;
 // to avoid deadlock in malloc call stack.
 static __thread bool tls_warn_up = false;
 
+#if BRPC_DEBUG_BTHREAD_SCHE_SAFETY
 // ++tls_pthread_lock_count when pthread locking,
 // --tls_pthread_lock_count when pthread unlocking.
 // Only when it is equal to 0, it is safe for the bthread to be scheduled.
+// Note: If a mutex is locked/unlocked in different thread,
+// `tls_pthread_lock_count' is inaccurate, so this feature cannot be used.
 static __thread int tls_pthread_lock_count = 0;
 
+#define ADD_TLS_PTHREAD_LOCK_COUNT ++tls_pthread_lock_count
+#define SUB_TLS_PTHREAD_LOCK_COUNT --tls_pthread_lock_count
+
 void CheckBthreadScheSafety() {
     if (BAIDU_LIKELY(0 == tls_pthread_lock_count)) {
         return;
@@ -497,11 +526,16 @@ void CheckBthreadScheSafety() {
         true, butil::memory_order_relaxed))) {
         butil::debug::StackTrace trace(true);
         // It can only be checked once because the counter is messed up.
-        LOG(ERROR) << "bthread is suspended while holding"
+        LOG(ERROR) << "bthread is suspended while holding "
                    << tls_pthread_lock_count << " pthread locks."
                    << std::endl << trace.ToString();
     }
 }
+#else
+#define ADD_TLS_PTHREAD_LOCK_COUNT ((void)0)
+#define SUB_TLS_PTHREAD_LOCK_COUNT ((void)0)
+void CheckBthreadScheSafety() {}
+#endif // BRPC_DEBUG_BTHREAD_SCHE_SAFETY
 
 // Speed up with TLS:
 //   Most pthread_mutex are locked and unlocked in the same thread. Putting
@@ -608,28 +642,50 @@ void submit_contention(const bthread_contention_site_t& 
csite, int64_t now_ns) {
 
 namespace internal {
 #ifndef NO_PTHREAD_MUTEX_HOOK
-BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
-    ++bthread::tls_pthread_lock_count;
-    return sys_pthread_mutex_lock(mutex);
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
+                                                   const struct timespec* 
abstime) {
+    int rc = NULL == abstime ?
+             sys_pthread_mutex_lock(mutex) :
+             sys_pthread_mutex_timedlock(mutex, abstime);
+    if (0 == rc) {
+        ADD_TLS_PTHREAD_LOCK_COUNT;
+    }
+    return rc;
 }
+#else
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex,
+                                                   const struct timespec*/* 
Not supported */) {
+    int rc = sys_pthread_mutex_lock(mutex);
+    if (0 == rc) {
+        ADD_TLS_PTHREAD_LOCK_COUNT;
+    }
+    return rc;
+}
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
 
 BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
     int rc = sys_pthread_mutex_trylock(mutex);
     if (0 == rc) {
-        ++tls_pthread_lock_count;
+        ADD_TLS_PTHREAD_LOCK_COUNT;
     }
     return rc;
 }
 
 BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
-    --tls_pthread_lock_count;
+    SUB_TLS_PTHREAD_LOCK_COUNT;
     return sys_pthread_mutex_unlock(mutex);
 }
-#endif
+#endif // NO_PTHREAD_MUTEX_HOOK
 
-BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
-    mutex->lock();
-    return 0;
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex,
+                                                   const struct timespec* 
abstime) {
+    if (NULL == abstime) {
+        mutex->lock();
+        return 0;
+    } else {
+        return mutex->timed_lock(abstime) ? 0 : errno;
+    }
 }
 
 BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) 
{
@@ -642,13 +698,13 @@ BUTIL_FORCE_INLINE int 
pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
 }
 
 template <typename Mutex>
-BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
+BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex, const struct 
timespec* abstime) {
     // Don't change behavior of lock when profiler is off.
     if (!g_cp ||
         // collecting code including backtrace() and submit() may call
         // pthread_mutex_lock and cause deadlock. Don't sample.
         tls_inside_lock) {
-        return pthread_mutex_lock_internal(mutex);
+        return pthread_mutex_lock_internal(mutex, abstime);
     }
     // Don't slow down non-contended locks.
     int rc = pthread_mutex_trylock_internal(mutex);
@@ -671,16 +727,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* 
mutex) {
         csite = &entry.csite;
         if (!bvar::is_sampling_range_valid(sampling_range)) {
             make_contention_site_invalid(&entry.csite);
-            return pthread_mutex_lock_internal(mutex);
+            return pthread_mutex_lock_internal(mutex, abstime);
         }
     }
 #endif
     if (!bvar::is_sampling_range_valid(sampling_range)) {  // don't sample
-        return pthread_mutex_lock_internal(mutex);
+        return pthread_mutex_lock_internal(mutex, abstime);
     }
     // Lock and monitor the waiting time.
     const int64_t start_ns = butil::cpuwide_time_ns();
-    rc = pthread_mutex_lock_internal(mutex);
+    rc = pthread_mutex_lock_internal(mutex, abstime);
     if (!rc) { // Inside lock
         if (!csite) {
             csite = add_pthread_contention_site(mutex);
@@ -746,13 +802,20 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* 
mutex) {
 
 #ifndef NO_PTHREAD_MUTEX_HOOK
 BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
-    return internal::pthread_mutex_lock_impl(mutex);
+    return internal::pthread_mutex_lock_impl(mutex, NULL);
 }
 
 BUTIL_FORCE_INLINE int pthread_mutex_trylock_impl(pthread_mutex_t* mutex) {
     return internal::pthread_mutex_trylock_impl(mutex);
 }
 
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+BUTIL_FORCE_INLINE int pthread_mutex_timedlock_impl(pthread_mutex_t* mutex,
+                                                    const struct timespec* 
abstime) {
+    return internal::pthread_mutex_lock_impl(mutex, abstime);
+}
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
+
 BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
     return internal::pthread_mutex_unlock_impl(mutex);
 }
@@ -777,8 +840,7 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
 
 const int MAX_SPIN_ITER = 4;
 
-inline int mutex_lock_contended_impl(
-    bthread_mutex_t* m, const struct timespec* __restrict abstime) {
+inline int mutex_lock_contended_impl(bthread_mutex_t* m, const struct 
timespec* abstime) {
     // When a bthread first contends for a lock, active spinning makes sense.
     // Spin only few times and only if local `rq' is empty.
     TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
@@ -817,11 +879,29 @@ inline int mutex_lock_contended_impl(
 #ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
 namespace internal {
 
-int FastPthreadMutex::lock_contended() {
-    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
+int FastPthreadMutex::lock_contended(const struct timespec* abstime) {
+    int64_t abstime_us = 0;
+    if (NULL != abstime) {
+        abstime_us = butil::timespec_to_microseconds(*abstime);
+    }
+    auto whole = (butil::atomic<unsigned>*)&_futex;
     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
-        if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
-            && errno != EWOULDBLOCK) {
+        timespec* ptimeout = NULL;
+        timespec timeout{};
+        if (NULL != abstime) {
+            timeout = butil::microseconds_to_timespec(
+                abstime_us - butil::gettimeofday_us());
+            ptimeout = &timeout;
+        }
+        if (NULL == abstime  || abstime_us > MIN_SLEEP_US) {
+            if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, ptimeout) < 0
+                && errno != EWOULDBLOCK && errno != EINTR/*note*/) {
+                // A mutex lock should ignore interruptions in general since
+                // user code is unlikely to check the return value.
+                return errno;
+            }
+        } else {
+            errno = ETIMEDOUT;
             return errno;
         }
     }
@@ -829,46 +909,64 @@ int FastPthreadMutex::lock_contended() {
 }
 
 void FastPthreadMutex::lock() {
-    auto split = (bthread::MutexInternal*)&_futex;
-    if (split->locked.exchange(1, butil::memory_order_acquire)) {
-        (void)lock_contended();
+    if (try_lock()) {
+        return;
     }
-    ++tls_pthread_lock_count;
+
+    (void)lock_contended(NULL);
+    ADD_TLS_PTHREAD_LOCK_COUNT;
 }
 
 bool FastPthreadMutex::try_lock() {
     auto split = (bthread::MutexInternal*)&_futex;
     bool lock = !split->locked.exchange(1, butil::memory_order_acquire);
     if (lock) {
-        ++tls_pthread_lock_count;
+        ADD_TLS_PTHREAD_LOCK_COUNT;
     }
     return lock;
 }
 
+bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
+    if (try_lock()) {
+        return true;
+    }
+    int rc = lock_contended(abstime);
+    if (rc == 0) {
+        ADD_TLS_PTHREAD_LOCK_COUNT;
+    }
+    return rc == 0;
+}
+
 void FastPthreadMutex::unlock() {
+    SUB_TLS_PTHREAD_LOCK_COUNT;
     auto whole = (butil::atomic<unsigned>*)&_futex;
     const unsigned prev = whole->exchange(0, butil::memory_order_release);
     // CAUTION: the mutex may be destroyed, check comments before butex_create
     if (prev != BTHREAD_MUTEX_LOCKED) {
         futex_wake_private(whole, 1);
     }
-    --tls_pthread_lock_count;
 }
 
 } // namespace internal
 #endif // BTHREAD_USE_FAST_PTHREAD_MUTEX
 
 void FastPthreadMutex::lock() {
-    internal::pthread_mutex_lock_impl(&_mutex);
+    internal::pthread_mutex_lock_impl(&_mutex, NULL);
 }
 
 void FastPthreadMutex::unlock() {
     internal::pthread_mutex_unlock_impl(&_mutex);
 }
 
+#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
+bool FastPthreadMutex::timed_lock(const struct timespec* abstime) {
+    return internal::pthread_mutex_lock_impl(&_mutex, abstime) == 0;
+}
+#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX HAS_PTHREAD_MUTEX_TIMEDLOCK
+
 } // namespace bthread
 
-extern "C" {
+__BEGIN_DECLS
 
 int bthread_mutex_init(bthread_mutex_t* __restrict m,
                        const bthread_mutexattr_t* __restrict attr) {
@@ -989,9 +1087,16 @@ int pthread_mutex_lock(pthread_mutex_t* __mutex) {
 int pthread_mutex_trylock(pthread_mutex_t* __mutex) {
     return bthread::pthread_mutex_trylock_impl(__mutex);
 }
+#if defined(OS_LINUX) && defined(OS_POSIX) && defined(__USE_XOPEN2K)
+int pthread_mutex_timedlock(pthread_mutex_t *__restrict __mutex,
+                                           const struct timespec *__restrict 
__abstime) {
+    return bthread::pthread_mutex_timedlock_impl(__mutex, __abstime);
+}
+#endif // OS_POSIX __USE_XOPEN2K
 int pthread_mutex_unlock(pthread_mutex_t* __mutex) {
     return bthread::pthread_mutex_unlock_impl(__mutex);
 }
-#endif
+#endif // NO_PTHREAD_MUTEX_HOOK
+
 
-}  // extern "C"
+__END_DECLS
diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h
index f1d1029b..d05d753c 100644
--- a/src/bthread/mutex.h
+++ b/src/bthread/mutex.h
@@ -61,8 +61,11 @@ public:
                                     "Mutex lock failed");
         }
     }
-    void unlock() { (bthread_mutex_unlock(&_mutex)); }
+    void unlock() { bthread_mutex_unlock(&_mutex); }
     bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); }
+    bool timed_lock(const struct timespec* abstime) {
+        return !bthread_mutex_timedlock(&_mutex, abstime);
+    }
     // TODO(chenzhangyi01): Complement interfaces for C++11
 private:
     DISALLOW_COPY_AND_ASSIGN(Mutex);
@@ -78,9 +81,10 @@ public:
     void lock();
     void unlock();
     bool try_lock();
+    bool timed_lock(const struct timespec* abstime);
 private:
     DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
-    int lock_contended();
+    int lock_contended(const struct timespec* abstime);
     unsigned _futex;
 };
 #else
@@ -97,6 +101,10 @@ public:
     void lock();
     void unlock();
     bool try_lock() { return _mutex.try_lock(); }
+#if defined(BTHREAD_USE_FAST_PTHREAD_MUTEX) || HAS_PTHREAD_MUTEX_TIMEDLOCK
+    bool timed_lock(const struct timespec* abstime);
+#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX  HAS_PTHREAD_MUTEX_TIMEDLOCK
+
 private:
     internal::FastPthreadMutex _mutex;
 };
diff --git a/src/butil/synchronization/lock.h b/src/butil/synchronization/lock.h
index b6f5215c..e62c76c4 100644
--- a/src/butil/synchronization/lock.h
+++ b/src/butil/synchronization/lock.h
@@ -23,7 +23,12 @@
 #include <windows.h>
 #elif defined(OS_POSIX)
 #include <pthread.h>
-#endif
+#if defined(OS_LINUX) && defined(__USE_XOPEN2K)
+#define HAS_PTHREAD_MUTEX_TIMEDLOCK 1
+#else
+#define HAS_PTHREAD_MUTEX_TIMEDLOCK 0
+#endif // OS_LINUX __USE_XOPEN2K
+#endif // OS_POSIX
 
 #include "butil/base_export.h"
 #include "butil/macros.h"
@@ -90,6 +95,12 @@ public:
 #endif
     }
 
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+    bool timed_lock(const struct timespec* abstime) {
+        return pthread_mutex_timedlock(&_native_handle, abstime) == 0;
+    }
+#endif // HAS_PTHREAD_MUTEX_TIMEDLOCK
+
     // Returns the underlying implementation-defined native handle object.
     NativeHandle* native_handle() { return &_native_handle; }
 
diff --git a/test/bthread_mutex_unittest.cpp b/test/bthread_mutex_unittest.cpp
index 21bd6044..b0802f9b 100644
--- a/test/bthread_mutex_unittest.cpp
+++ b/test/bthread_mutex_unittest.cpp
@@ -108,8 +108,12 @@ TEST(MutexTest, cpp_wrapper) {
     mutex.unlock();
     mutex.lock();
     mutex.unlock();
+    struct timespec t = { -2, 0 };
+    ASSERT_TRUE(mutex.timed_lock(&t));
+    mutex.unlock();
     {
         BAIDU_SCOPED_LOCK(mutex);
+        ASSERT_FALSE(mutex.timed_lock(&t));
     }
     {
         std::unique_lock<bthread::Mutex> lck1;
@@ -132,6 +136,8 @@ TEST(MutexTest, cpp_wrapper) {
     }
     ASSERT_TRUE(mutex.try_lock());
     mutex.unlock();
+    ASSERT_TRUE(mutex.timed_lock(&t));
+    mutex.unlock();
 }
 
 bool g_started = false;
@@ -268,6 +274,13 @@ TEST(MutexTest, mix_thread_types) {
     }
 }
 
+void* do_fast_pthread_timedlock(void *arg) {
+    struct timespec t = { -2, 0 };
+    EXPECT_FALSE(((bthread::FastPthreadMutex*)arg)->timed_lock(&t));
+    EXPECT_EQ(ETIMEDOUT, errno);
+    return NULL;
+}
+
 TEST(MutexTest, fast_pthread_mutex) {
     bthread::FastPthreadMutex mutex;
     ASSERT_TRUE(mutex.try_lock());
@@ -276,6 +289,12 @@ TEST(MutexTest, fast_pthread_mutex) {
     mutex.unlock();
     {
         BAIDU_SCOPED_LOCK(mutex);
+        struct timespec t = { -2, 0 };
+        ASSERT_FALSE(mutex.timed_lock(&t));
+        ASSERT_EQ(ETIMEDOUT, errno);
+        pthread_t th;
+        ASSERT_EQ(0, pthread_create(&th, NULL, do_fast_pthread_timedlock, 
&mutex));
+        ASSERT_EQ(0, pthread_join(th, NULL));
     }
     {
         std::unique_lock<bthread::FastPthreadMutex> lck1;
@@ -300,4 +319,47 @@ TEST(MutexTest, fast_pthread_mutex) {
     }
 }
 
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+void* do_pthread_timedlock(void *arg) {
+    struct timespec t = { -2, 0 };
+    EXPECT_EQ(ETIMEDOUT, pthread_mutex_timedlock((pthread_mutex_t*)arg, &t));
+    EXPECT_EQ(ETIMEDOUT, errno);
+    return NULL;
+}
+#endif
+
+TEST(MutexTest, pthread_mutex) {
+    pthread_mutex_t mutex;
+    ASSERT_EQ(0, pthread_mutex_init(&mutex, NULL));
+    ASSERT_EQ(0, pthread_mutex_trylock(&mutex));
+    ASSERT_EQ(0, pthread_mutex_unlock(&mutex));
+    ASSERT_EQ(0, pthread_mutex_lock(&mutex));
+    ASSERT_EQ(0, pthread_mutex_unlock(&mutex));
+    {
+        BAIDU_SCOPED_LOCK(mutex);
+#if HAS_PTHREAD_MUTEX_TIMEDLOCK
+        LOG(INFO) << "pthread_mutex_timedlock is available";
+        struct timespec t = { -2, 0 };
+        ASSERT_EQ(ETIMEDOUT, pthread_mutex_timedlock(&mutex, &t));
+        pthread_t th;
+        ASSERT_EQ(0, pthread_create(&th, NULL, do_fast_pthread_timedlock, 
&mutex));
+        ASSERT_EQ(0, pthread_join(th, NULL));
+#endif
+    }
+    ASSERT_EQ(0, pthread_mutex_trylock(&mutex));
+    ASSERT_EQ(0, pthread_mutex_unlock(&mutex));
+
+    const int N = 16;
+    pthread_t pthreads[N];
+    for (int i = 0; i < N; ++i) {
+        ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
+            loop_until_stopped<pthread_mutex_t>, &mutex));
+    }
+    bthread_usleep(1000L * 1000);
+    g_stopped = true;
+    for (int i = 0; i < N; ++i) {
+        pthread_join(pthreads[i], NULL);
+    }
+}
+
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to