The current std::lock algorithm is the one called "persistent" in Howard
Hinnant's https://howardhinnant.github.io/dining_philosophers.html post.
While it tends to perform acceptably fast, it wastes a lot of CPU cycles
by continuously locking and unlocking the uncontended mutexes.
Effectively, it's a spin lock with no back-off.

This replaces it with the one Howard calls "smart and polite". It's
smart, because when a Mi.try_lock() call fails because mutex Mi is
contended, the algorithm reorders the mutexes until Mi is first, then
calls Mi.lock(), to block until Mi is no longer contended.  It's
polite because it uses std::this_thread::yield() between the failed
Mi.try_lock() call and the Mi.lock() call. (In reality it uses
__gthread_yield() directly, because using this_thread::yield() would
require shuffling code around to avoid a circular dependency.)

This version of the algorithm is inspired by some hints from Howard, so
that it has strictly bounded stack usage. As the comment in the code
says:

// This function can recurse up to N levels deep, for N = 1+sizeof...(L1).
// On each recursion the lockables are rotated left one position,
// e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1.
// When a call to l_i.try_lock() fails it recurses/returns to depth=i
// so that l_i is the first argument, and then blocks until l_i is locked.

The 'i' parameter is the desired permuation of the lockables, and the
'depth' parameter is the depth in the call stack of the current
instantiation of the function template. If i == depth then the function
calls l0.lock() and then l1.try_lock()... for each lockable in the
parameter pack l1.  If i > depth then the function rotates the lockables
to the left one place, and calls itself again to go one level deeper.
Finally, if i < depth then the function returns to a shallower depth,
equivalent to a right rotate of the lockables.  When a call to
try_lock() fails, i is set to the index of the contended lockable, so
that the next call to l0.lock() will use the contended lockable as l0.

This commit also replaces the std::try_lock implementation details. The
new code is identical in behaviour, but uses a pair of constrained
function templates. This avoids instantiating a class template, and is a
litle simpler to call where used in std::__detail::__lock_impl and
std::try_lock.

Signed-off-by: Jonathan Wakely <jwak...@redhat.com>

libstdc++-v3/ChangeLog:

        * include/std/mutex (__try_to_lock): Move to __detail namespace.
        (struct __try_lock_impl): Replace with ...
        (__detail::__try_lock_impl<Idx>(tuple<Lockables...>&)): New
        function templates to implement std::try_lock.
        (try_lock): Use new __try_lock_impl.
        (__detail::__lock_impl(int, int&, L0&, L1&...)): New function
        template to implement std::lock.
        (lock): Use __lock_impl.

Tested powerpc64le-linux. Committed to trunk.

commit 6cf0040fff78a665db31a6a8dee60b12eef2e590
Author: Jonathan Wakely <jwak...@redhat.com>
Date:   Mon Jun 21 13:35:18 2021

    libstdc++: Improve std::lock algorithm
    
    The current std::lock algorithm is the one called "persistent" in Howard
    Hinnant's https://howardhinnant.github.io/dining_philosophers.html post.
    While it tends to perform acceptably fast, it wastes a lot of CPU cycles
    by continuously locking and unlocking the uncontended mutexes.
    Effectively, it's a spin lock with no back-off.
    
    This replaces it with the one Howard calls "smart and polite". It's
    smart, because when a Mi.try_lock() call fails because mutex Mi is
    contended, the algorithm reorders the mutexes until Mi is first, then
    calls Mi.lock(), to block until Mi is no longer contended.  It's
    polite because it uses std::this_thread::yield() between the failed
    Mi.try_lock() call and the Mi.lock() call. (In reality it uses
    __gthread_yield() directly, because using this_thread::yield() would
    require shuffling code around to avoid a circular dependency.)
    
    This version of the algorithm is inspired by some hints from Howard, so
    that it has strictly bounded stack usage. As the comment in the code
    says:
    
    // This function can recurse up to N levels deep, for N = 1+sizeof...(L1).
    // On each recursion the lockables are rotated left one position,
    // e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1.
    // When a call to l_i.try_lock() fails it recurses/returns to depth=i
    // so that l_i is the first argument, and then blocks until l_i is locked.
    
    The 'i' parameter is the desired permuation of the lockables, and the
    'depth' parameter is the depth in the call stack of the current
    instantiation of the function template. If i == depth then the function
    calls l0.lock() and then l1.try_lock()... for each lockable in the
    parameter pack l1.  If i > depth then the function rotates the lockables
    to the left one place, and calls itself again to go one level deeper.
    Finally, if i < depth then the function returns to a shallower depth,
    equivalent to a right rotate of the lockables.  When a call to
    try_lock() fails, i is set to the index of the contended lockable, so
    that the next call to l0.lock() will use the contended lockable as l0.
    
    This commit also replaces the std::try_lock implementation details. The
    new code is identical in behaviour, but uses a pair of constrained
    function templates. This avoids instantiating a class template, and is a
    litle simpler to call where used in std::__detail::__lock_impl and
    std::try_lock.
    
    Signed-off-by: Jonathan Wakely <jwak...@redhat.com>
    
    libstdc++-v3/ChangeLog:
    
            * include/std/mutex (__try_to_lock): Move to __detail namespace.
            (struct __try_lock_impl): Replace with ...
            (__detail::__try_lock_impl<Idx>(tuple<Lockables...>&)): New
            function templates to implement std::try_lock.
            (try_lock): Use new __try_lock_impl.
            (__detail::__lock_impl(int, int&, L0&, L1&...)): New function
            template to implement std::lock.
            (lock): Use __lock_impl.

diff --git a/libstdc++-v3/include/std/mutex b/libstdc++-v3/include/std/mutex
index d4c5d13f654..5f2d8f9ee7b 100644
--- a/libstdc++-v3/include/std/mutex
+++ b/libstdc++-v3/include/std/mutex
@@ -512,47 +512,44 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
 #endif // _GLIBCXX_HAS_GTHREADS
 
   /// @cond undocumented
-  template<typename _Lock>
-    inline unique_lock<_Lock>
-    __try_to_lock(_Lock& __l)
-    { return unique_lock<_Lock>{__l, try_to_lock}; }
+  namespace __detail
+  {
+    template<typename _Lockable>
+      inline unique_lock<_Lockable>
+      __try_to_lock(_Lockable& __l)
+      { return unique_lock<_Lockable>{__l, try_to_lock}; }
 
-  template<int _Idx, bool _Continue = true>
-    struct __try_lock_impl
-    {
-      template<typename... _Lock>
-       static void
-       __do_try_lock(tuple<_Lock&...>& __locks, int& __idx)
-       {
-          __idx = _Idx;
-          auto __lock = std::__try_to_lock(std::get<_Idx>(__locks));
-          if (__lock.owns_lock())
-            {
-             constexpr bool __cont = _Idx + 2 < sizeof...(_Lock);
-             using __try_locker = __try_lock_impl<_Idx + 1, __cont>;
-             __try_locker::__do_try_lock(__locks, __idx);
-              if (__idx == -1)
-                __lock.release();
-            }
-       }
-    };
+    // Lock the last element of the tuple, after all previous ones are locked.
+    template<int _Idx, typename... _Lockables>
+      inline __enable_if_t<_Idx + 1 == sizeof...(_Lockables), int>
+      __try_lock_impl(tuple<_Lockables&...>& __lockables)
+      {
+       if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables)))
+         {
+           __lock.release();
+           return -1;
+         }
+       else
+         return _Idx;
+      }
 
-  template<int _Idx>
-    struct __try_lock_impl<_Idx, false>
-    {
-      template<typename... _Lock>
-       static void
-       __do_try_lock(tuple<_Lock&...>& __locks, int& __idx)
-       {
-          __idx = _Idx;
-          auto __lock = std::__try_to_lock(std::get<_Idx>(__locks));
-          if (__lock.owns_lock())
-            {
-              __idx = -1;
-              __lock.release();
-            }
-       }
-    };
+    // Lock tuple elements starting from _Idx.
+    template<int _Idx, typename... _Lockables>
+      inline __enable_if_t<_Idx + 1 != sizeof...(_Lockables), int>
+      __try_lock_impl(tuple<_Lockables&...>& __lockables)
+      {
+       if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables)))
+         {
+           int __idx = __detail::__try_lock_impl<_Idx + 1>(__lockables);
+           if (__idx == -1)
+             __lock.release();
+           return __idx;
+         }
+       else
+         return _Idx;
+      }
+
+  } // namespace __detail
   /// @endcond
 
   /** @brief Generic try_lock.
@@ -569,12 +566,52 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
     int
     try_lock(_Lock1& __l1, _Lock2& __l2, _Lock3&... __l3)
     {
-      int __idx;
-      auto __locks = std::tie(__l1, __l2, __l3...);
-      __try_lock_impl<0>::__do_try_lock(__locks, __idx);
-      return __idx;
+      auto __lockables = std::tie(__l1, __l2, __l3...);
+      return __detail::__try_lock_impl<0>(__lockables);
     }
 
+  /// @cond undocumented
+  namespace __detail
+  {
+    // This function can recurse up to N levels deep, for N = 1+sizeof...(L1).
+    // On each recursion the lockables are rotated left one position,
+    // e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1.
+    // When a call to l_i.try_lock() fails it recurses/returns to depth=i
+    // so that l_i is the first argument, and then blocks until l_i is locked.
+    template<typename _L0, typename... _L1>
+      void
+      __lock_impl(int& __i, int __depth, _L0& __l0, _L1&... __l1)
+      {
+       while (__i >= __depth)
+         {
+           if (__i == __depth)
+             {
+               int __failed = 1; // index that couldn't be locked
+               {
+                 unique_lock<_L0> __first(__l0);
+                 auto __rest = std::tie(__l1...);
+                 __failed += __detail::__try_lock_impl<0>(__rest);
+                 if (!__failed)
+                   {
+                     __i = -1; // finished
+                     __first.release();
+                     return;
+                   }
+               }
+#ifdef _GLIBCXX_USE_SCHED_YIELD
+               __gthread_yield();
+#endif
+               constexpr auto __n = 1 + sizeof...(_L1);
+               __i = (__depth + __failed) % __n;
+             }
+           else // rotate left until l_i is first.
+             __detail::__lock_impl(__i, __depth + 1, __l1..., __l0);
+         }
+      }
+
+  } // namespace __detail
+  /// @endcond
+
   /** @brief Generic lock.
    *  @param __l1 Meets Lockable requirements (try_lock() may throw).
    *  @param __l2 Meets Lockable requirements (try_lock() may throw).
@@ -590,19 +627,8 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
     void
     lock(_L1& __l1, _L2& __l2, _L3&... __l3)
     {
-      while (true)
-        {
-          using __try_locker = __try_lock_impl<0, sizeof...(_L3) != 0>;
-          unique_lock<_L1> __first(__l1);
-          int __idx;
-          auto __locks = std::tie(__l2, __l3...);
-          __try_locker::__do_try_lock(__locks, __idx);
-          if (__idx == -1)
-            {
-              __first.release();
-              return;
-            }
-        }
+      int __i = 0;
+      __detail::__lock_impl(__i, 0, __l1, __l2, __l3...);
     }
 
 #if __cplusplus >= 201703L

Reply via email to