tberghammer created this revision. tberghammer added reviewers: labath, clayborg, vharron, zturner. tberghammer added a subscriber: lldb-commits. Herald added a subscriber: iancottrell.
Add a new task pool class to LLDB to make it easy to execute tasks in parallel Basic design goals: * Have a very lightweight and easy to use interface where a list of lambdas can be executed in parallel * Use a global thread pool to limit the number of threads used (std::async don't do it on Linux) and to eliminate the thread creation overhead Possible future improvements (please weight in about priorities about these and add what additional features you want to see): * Possibility to cancel already added, but not yet started tasks * Lazy creation of the worker threads * Removing unused worker threads after some time * Parallel for_each implementation The first user of the thread pool will be the dwarf parsing code. An example of how it will be used is available at http://reviews.llvm.org/D13662 (diff 2) http://reviews.llvm.org/D13727 Files: include/lldb/Utility/TaskPool.h source/Utility/CMakeLists.txt source/Utility/TaskPool.cpp
Index: source/Utility/TaskPool.cpp =================================================================== --- /dev/null +++ source/Utility/TaskPool.cpp @@ -0,0 +1,59 @@ +//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "lldb/Utility/TaskPool.h" + +TaskPool::TaskPoolImpl& +TaskPool::GetImplementation() +{ + static TaskPool::TaskPoolImpl g_task_pool_impl(std::thread::hardware_concurrency()); + return g_task_pool_impl; +} + +TaskPool::TaskPoolImpl::TaskPoolImpl(uint32_t num_threads) : + m_stop(false) +{ + for (uint32_t i = 0; i < num_threads; ++i) + m_threads.emplace_back(Worker, this); +} + +TaskPool::TaskPoolImpl::~TaskPoolImpl() +{ + Stop(); +} + +void +TaskPool::TaskPoolImpl::Stop() +{ + std::unique_lock<std::mutex> lock(m_tasks_mutex); + m_stop = true; + lock.unlock(); + m_tasks_cv.notify_all(); + for (auto& t : m_threads) + t.join(); +} + +void +TaskPool::TaskPoolImpl::Worker(TaskPoolImpl* pool) +{ + while (true) + { + std::unique_lock<std::mutex> lock(pool->m_tasks_mutex); + if (pool->m_tasks.empty()) + pool->m_tasks_cv.wait(lock, [pool](){ return !pool->m_tasks.empty() || pool->m_stop; }); + if (pool->m_tasks.empty()) + break; + + std::function<void()> f = pool->m_tasks.front(); + pool->m_tasks.pop(); + lock.unlock(); + + f(); + } +} Index: source/Utility/CMakeLists.txt =================================================================== --- source/Utility/CMakeLists.txt +++ source/Utility/CMakeLists.txt @@ -14,6 +14,7 @@ StringExtractor.cpp StringExtractorGDBRemote.cpp StringLexer.cpp + TaskPool.cpp TimeSpecTimeout.cpp UriParser.cpp ) Index: include/lldb/Utility/TaskPool.h =================================================================== --- /dev/null +++ include/lldb/Utility/TaskPool.h @@ -0,0 +1,210 @@ +//===--------------------- TaskPool.h ---------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include <cassert> +#include <cstdint> +#include <future> +#include <list> +#include <queue> +#include <thread> +#include <vector> + +class TaskPool +{ +public: + TaskPool() = delete; + + // Add a new task to the thread pool and return a std::future belongs for the newly created task. + // The caller of this function have to wait on the future for this task to complete. + template<typename F, typename... Args> + static std::future<typename std::result_of<F(Args...)>::type> + AddTask(F&& f, Args&&... args); + + // Run all of the specified tasks on the thread pool and wait until all of them are finished + // before returning + template<typename... T> + static void + RunTasks(T&&... t); + +private: + template<typename... T> + struct RunTaskImpl; + + class TaskPoolImpl + { + public: + TaskPoolImpl(uint32_t num_threads); + + ~TaskPoolImpl(); + + template<typename F, typename... Args> + std::future<typename std::result_of<F(Args...)>::type> + AddTask(F&& f, Args&&... args); + + void + Stop(); + + private: + static void + Worker(TaskPoolImpl* pool); + + std::queue<std::function<void()>> m_tasks; + std::mutex m_tasks_mutex; + std::condition_variable m_tasks_cv; + bool m_stop; + std::vector<std::thread> m_threads; + }; + + static TaskPoolImpl& + GetImplementation(); +}; + +template <typename T> +class TaskRunner +{ +public: + template<typename F, typename... Args> + void + AddTask(F&& f, Args&&... args); + + std::future<T> + WaitForNextCompletedTask(); + + void + WaitForAllTask(); + +private: + std::list<std::future<T>> m_ready; + std::list<std::future<T>> m_pending; + std::mutex m_mutex; + std::condition_variable m_cv; +}; + +template<typename F, typename... Args> +std::future<typename std::result_of<F(Args...)>::type> +TaskPool::AddTask(F&& f, Args&&... args) +{ + return GetImplementation().AddTask(std::forward<F>(f), std::forward<Args>(args)...); +} + +template<typename... T> +void +TaskPool::RunTasks(T&&... t) +{ + RunTaskImpl<T...>::Run(std::forward<T>(t)...); +} + +template<typename H, typename... T> +struct TaskPool::RunTaskImpl<H, T...> +{ + static void + Run(H&& h, T&&... t) + { + auto f = AddTask(std::forward<H>(h)); + RunTaskImpl<T...>::Run(std::forward<T>(t)...); + f.wait(); + } +}; + +template<> +struct TaskPool::RunTaskImpl<> +{ + static void + Run() {} +}; + +template<typename F, typename... Args> +std::future<typename std::result_of<F(Args...)>::type> +TaskPool::TaskPoolImpl::AddTask(F&& f, Args&&... args) +{ + auto task = std::make_shared<std::packaged_task<typename std::result_of<F(Args...)>::type()>>( + std::bind(std::forward<F>(f), std::forward<Args>(args)...)); + + std::unique_lock<std::mutex> lock(m_tasks_mutex); + assert(!m_stop && "Can't add task to TaskPool after it is stopped"); + m_tasks.emplace([task](){ (*task)(); }); + lock.unlock(); + m_tasks_cv.notify_one(); + + return task->get_future(); +} + +template <typename T> +template<typename F, typename... Args> +void +TaskRunner<T>::AddTask(F&& f, Args&&... args) +{ + std::unique_lock<std::mutex> lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F&& f, Args&&... args) + { + T&& r = f(args...); + + std::unique_lock<std::mutex> lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + return r; + }, + std::forward<F>(f), + std::forward<Args>(args)...)); +} + +template <> +template<typename F, typename... Args> +void +TaskRunner<void>::AddTask(F&& f, Args&&... args) +{ + std::unique_lock<std::mutex> lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F&& f, Args&&... args) + { + f(args...); + + std::unique_lock<std::mutex> lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + }, + std::forward<F>(f), + std::forward<Args>(args)...)); +} + +template <typename T> +std::future<T> +TaskRunner<T>::WaitForNextCompletedTask() +{ + std::unique_lock<std::mutex> lock(m_mutex); + if (m_ready.empty() && m_pending.empty()) + return std::future<T>(); // No more tasks + + if (m_ready.empty()) + m_cv.wait(lock, [this](){ return !this->m_ready.empty(); }); + + std::future<T> res = std::move(m_ready.front()); + m_ready.pop_front(); + + lock.unlock(); + res.wait(); + + return std::move(res); +} + +template <typename T> +void +TaskRunner<T>::WaitForAllTask() +{ + while (WaitForNextCompletedTask().valid()); +}
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits