The current std::lock algorithm is the one called "persistent" in Howard Hinnant's https://howardhinnant.github.io/dining_philosophers.html post. While it tends to perform acceptably fast, it wastes a lot of CPU cycles by continuously locking and unlocking the uncontended mutexes. Effectively, it's a spin lock with no back-off.
This replaces it with the one Howard calls "smart and polite". It's smart, because when a Mi.try_lock() call fails because mutex Mi is contended, the algorithm reorders the mutexes until Mi is first, then calls Mi.lock(), to block until Mi is no longer contended. It's polite because it uses std::this_thread::yield() between the failed Mi.try_lock() call and the Mi.lock() call. (In reality it uses __gthread_yield() directly, because using this_thread::yield() would require shuffling code around to avoid a circular dependency.) This version of the algorithm is inspired by some hints from Howard, so that it has strictly bounded stack usage. As the comment in the code says: // This function can recurse up to N levels deep, for N = 1+sizeof...(L1). // On each recursion the lockables are rotated left one position, // e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1. // When a call to l_i.try_lock() fails it recurses/returns to depth=i // so that l_i is the first argument, and then blocks until l_i is locked. The 'i' parameter is the desired permuation of the lockables, and the 'depth' parameter is the depth in the call stack of the current instantiation of the function template. If i == depth then the function calls l0.lock() and then l1.try_lock()... for each lockable in the parameter pack l1. If i > depth then the function rotates the lockables to the left one place, and calls itself again to go one level deeper. Finally, if i < depth then the function returns to a shallower depth, equivalent to a right rotate of the lockables. When a call to try_lock() fails, i is set to the index of the contended lockable, so that the next call to l0.lock() will use the contended lockable as l0. This commit also replaces the std::try_lock implementation details. The new code is identical in behaviour, but uses a pair of constrained function templates. This avoids instantiating a class template, and is a litle simpler to call where used in std::__detail::__lock_impl and std::try_lock. Signed-off-by: Jonathan Wakely <jwak...@redhat.com> libstdc++-v3/ChangeLog: * include/std/mutex (__try_to_lock): Move to __detail namespace. (struct __try_lock_impl): Replace with ... (__detail::__try_lock_impl<Idx>(tuple<Lockables...>&)): New function templates to implement std::try_lock. (try_lock): Use new __try_lock_impl. (__detail::__lock_impl(int, int&, L0&, L1&...)): New function template to implement std::lock. (lock): Use __lock_impl. Tested powerpc64le-linux. Committed to trunk.
commit 6cf0040fff78a665db31a6a8dee60b12eef2e590 Author: Jonathan Wakely <jwak...@redhat.com> Date: Mon Jun 21 13:35:18 2021 libstdc++: Improve std::lock algorithm The current std::lock algorithm is the one called "persistent" in Howard Hinnant's https://howardhinnant.github.io/dining_philosophers.html post. While it tends to perform acceptably fast, it wastes a lot of CPU cycles by continuously locking and unlocking the uncontended mutexes. Effectively, it's a spin lock with no back-off. This replaces it with the one Howard calls "smart and polite". It's smart, because when a Mi.try_lock() call fails because mutex Mi is contended, the algorithm reorders the mutexes until Mi is first, then calls Mi.lock(), to block until Mi is no longer contended. It's polite because it uses std::this_thread::yield() between the failed Mi.try_lock() call and the Mi.lock() call. (In reality it uses __gthread_yield() directly, because using this_thread::yield() would require shuffling code around to avoid a circular dependency.) This version of the algorithm is inspired by some hints from Howard, so that it has strictly bounded stack usage. As the comment in the code says: // This function can recurse up to N levels deep, for N = 1+sizeof...(L1). // On each recursion the lockables are rotated left one position, // e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1. // When a call to l_i.try_lock() fails it recurses/returns to depth=i // so that l_i is the first argument, and then blocks until l_i is locked. The 'i' parameter is the desired permuation of the lockables, and the 'depth' parameter is the depth in the call stack of the current instantiation of the function template. If i == depth then the function calls l0.lock() and then l1.try_lock()... for each lockable in the parameter pack l1. If i > depth then the function rotates the lockables to the left one place, and calls itself again to go one level deeper. Finally, if i < depth then the function returns to a shallower depth, equivalent to a right rotate of the lockables. When a call to try_lock() fails, i is set to the index of the contended lockable, so that the next call to l0.lock() will use the contended lockable as l0. This commit also replaces the std::try_lock implementation details. The new code is identical in behaviour, but uses a pair of constrained function templates. This avoids instantiating a class template, and is a litle simpler to call where used in std::__detail::__lock_impl and std::try_lock. Signed-off-by: Jonathan Wakely <jwak...@redhat.com> libstdc++-v3/ChangeLog: * include/std/mutex (__try_to_lock): Move to __detail namespace. (struct __try_lock_impl): Replace with ... (__detail::__try_lock_impl<Idx>(tuple<Lockables...>&)): New function templates to implement std::try_lock. (try_lock): Use new __try_lock_impl. (__detail::__lock_impl(int, int&, L0&, L1&...)): New function template to implement std::lock. (lock): Use __lock_impl. diff --git a/libstdc++-v3/include/std/mutex b/libstdc++-v3/include/std/mutex index d4c5d13f654..5f2d8f9ee7b 100644 --- a/libstdc++-v3/include/std/mutex +++ b/libstdc++-v3/include/std/mutex @@ -512,47 +512,44 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION #endif // _GLIBCXX_HAS_GTHREADS /// @cond undocumented - template<typename _Lock> - inline unique_lock<_Lock> - __try_to_lock(_Lock& __l) - { return unique_lock<_Lock>{__l, try_to_lock}; } + namespace __detail + { + template<typename _Lockable> + inline unique_lock<_Lockable> + __try_to_lock(_Lockable& __l) + { return unique_lock<_Lockable>{__l, try_to_lock}; } - template<int _Idx, bool _Continue = true> - struct __try_lock_impl - { - template<typename... _Lock> - static void - __do_try_lock(tuple<_Lock&...>& __locks, int& __idx) - { - __idx = _Idx; - auto __lock = std::__try_to_lock(std::get<_Idx>(__locks)); - if (__lock.owns_lock()) - { - constexpr bool __cont = _Idx + 2 < sizeof...(_Lock); - using __try_locker = __try_lock_impl<_Idx + 1, __cont>; - __try_locker::__do_try_lock(__locks, __idx); - if (__idx == -1) - __lock.release(); - } - } - }; + // Lock the last element of the tuple, after all previous ones are locked. + template<int _Idx, typename... _Lockables> + inline __enable_if_t<_Idx + 1 == sizeof...(_Lockables), int> + __try_lock_impl(tuple<_Lockables&...>& __lockables) + { + if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables))) + { + __lock.release(); + return -1; + } + else + return _Idx; + } - template<int _Idx> - struct __try_lock_impl<_Idx, false> - { - template<typename... _Lock> - static void - __do_try_lock(tuple<_Lock&...>& __locks, int& __idx) - { - __idx = _Idx; - auto __lock = std::__try_to_lock(std::get<_Idx>(__locks)); - if (__lock.owns_lock()) - { - __idx = -1; - __lock.release(); - } - } - }; + // Lock tuple elements starting from _Idx. + template<int _Idx, typename... _Lockables> + inline __enable_if_t<_Idx + 1 != sizeof...(_Lockables), int> + __try_lock_impl(tuple<_Lockables&...>& __lockables) + { + if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables))) + { + int __idx = __detail::__try_lock_impl<_Idx + 1>(__lockables); + if (__idx == -1) + __lock.release(); + return __idx; + } + else + return _Idx; + } + + } // namespace __detail /// @endcond /** @brief Generic try_lock. @@ -569,12 +566,52 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION int try_lock(_Lock1& __l1, _Lock2& __l2, _Lock3&... __l3) { - int __idx; - auto __locks = std::tie(__l1, __l2, __l3...); - __try_lock_impl<0>::__do_try_lock(__locks, __idx); - return __idx; + auto __lockables = std::tie(__l1, __l2, __l3...); + return __detail::__try_lock_impl<0>(__lockables); } + /// @cond undocumented + namespace __detail + { + // This function can recurse up to N levels deep, for N = 1+sizeof...(L1). + // On each recursion the lockables are rotated left one position, + // e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1. + // When a call to l_i.try_lock() fails it recurses/returns to depth=i + // so that l_i is the first argument, and then blocks until l_i is locked. + template<typename _L0, typename... _L1> + void + __lock_impl(int& __i, int __depth, _L0& __l0, _L1&... __l1) + { + while (__i >= __depth) + { + if (__i == __depth) + { + int __failed = 1; // index that couldn't be locked + { + unique_lock<_L0> __first(__l0); + auto __rest = std::tie(__l1...); + __failed += __detail::__try_lock_impl<0>(__rest); + if (!__failed) + { + __i = -1; // finished + __first.release(); + return; + } + } +#ifdef _GLIBCXX_USE_SCHED_YIELD + __gthread_yield(); +#endif + constexpr auto __n = 1 + sizeof...(_L1); + __i = (__depth + __failed) % __n; + } + else // rotate left until l_i is first. + __detail::__lock_impl(__i, __depth + 1, __l1..., __l0); + } + } + + } // namespace __detail + /// @endcond + /** @brief Generic lock. * @param __l1 Meets Lockable requirements (try_lock() may throw). * @param __l2 Meets Lockable requirements (try_lock() may throw). @@ -590,19 +627,8 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION void lock(_L1& __l1, _L2& __l2, _L3&... __l3) { - while (true) - { - using __try_locker = __try_lock_impl<0, sizeof...(_L3) != 0>; - unique_lock<_L1> __first(__l1); - int __idx; - auto __locks = std::tie(__l2, __l3...); - __try_locker::__do_try_lock(__locks, __idx); - if (__idx == -1) - { - __first.release(); - return; - } - } + int __i = 0; + __detail::__lock_impl(__i, 0, __l1, __l2, __l3...); } #if __cplusplus >= 201703L