erik.pilkington created this revision.
Herald added a reviewer: EricWF.

This patch adds a simple work-stealing scheduler meant for use as a fallback 
implementation for the C++17 parallel stl algorithms. This scheme follows a 
very simple fork/join API that should be easy to map onto different underlying 
concurrency implementations if they are available. This should be suitable for 
implementing par & par_unseq on top of.

I also tried this out with a lock-free deque[0]. This provides a modest 
performance improvement, and might be worth implementing. For the sake of doing 
this incrementally this patch just contains a locking deque.

Please see the recent thread on cfe-dev for context: 
http://lists.llvm.org/pipermail/cfe-dev/2017-May/053841.html

I'm still pretty new to libc++ and to parallel stuff, so please feel free to 
tear this patch apart!
Thanks for taking a look,
Erik

[0]: 
https://pdfs.semanticscholar.org/3771/77bb82105c35e6e26ebad1698a20688473bd.pdf


https://reviews.llvm.org/D33977

Files:
  include/experimental/execution
  src/experimental/execution.cpp
  test/libcxx/experimental/execution/fork_join.pass.cpp

Index: test/libcxx/experimental/execution/fork_join.pass.cpp
===================================================================
--- /dev/null
+++ test/libcxx/experimental/execution/fork_join.pass.cpp
@@ -0,0 +1,58 @@
+//===----------------------------------------------------------------------===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is dual licensed under the MIT and the University of Illinois Open
+// Source Licenses. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include <experimental/execution>
+#include <cassert>
+#include <future>
+
+// UNSUPPORTED: libcpp-has-no-threads
+// UNSUPPORTED: c++98, c++03
+
+using namespace std;
+using namespace experimental;
+
+int parallel_fibb(int n, __task t) {
+  if (n < 2)
+    return 1;
+
+  int lhs;
+  t.__fork([&](__task t) { lhs = parallel_fibb(n - 2, t); });
+  int rhs = parallel_fibb(n - 1, t);
+  t.__join();
+  return rhs + lhs;
+}
+
+void fork_many(int n, __task t) {
+  atomic<int> count(0);
+  for (int i = 0; i < n; ++i)
+    t.__fork([&](__task) { ++count; });
+
+  t.__join();
+  assert(count.load() == n);
+}
+
+int main() {
+  {
+    int res;
+    __evaluate_parallel_task([&](__task t) { res = parallel_fibb(10, t); });
+    assert(res == 89);
+  }
+
+  {
+    __evaluate_parallel_task([&](__task t) { fork_many(100, t); });
+  }
+
+  {
+    auto f = async(launch::async, [] {
+      __evaluate_parallel_task([&](__task t) { fork_many(100, t); });
+    });
+    __evaluate_parallel_task([&](__task t) { fork_many(100, t); });
+    f.get();
+  }
+}
Index: src/experimental/execution.cpp
===================================================================
--- /dev/null
+++ src/experimental/execution.cpp
@@ -0,0 +1,243 @@
+// -*- C++ -*-
+//===------------------------ execution.cpp -------------------------------===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is dual licensed under the MIT and the University of Illinois Open
+// Source Licenses. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "experimental/__config"
+#ifndef _LIBCPP_HAS_NO_THREADS
+
+#include "experimental/execution"
+#include "atomic"
+#include "deque"
+#include "mutex"
+#include "vector"
+#include "thread"
+
+_LIBCPP_BEGIN_NAMESPACE_EXPERIMENTAL
+
+namespace {
+class task_group;
+class worker;
+class bound_task;
+
+// FIXME: replace this with a lock free version.
+template <class T>
+class stealing_deque {
+  deque<T> queue_;
+  mutex lock_;
+
+public:
+  stealing_deque() = default;
+  stealing_deque(const stealing_deque& other) : queue_(other.queue_) {}
+
+  void emplace_back(T&& input) {
+    lock_guard<mutex> ul(lock_);
+    queue_.emplace_back(move(input));
+  }
+
+  bool pop_back(T& output) {
+    lock_guard<mutex> ul(lock_);
+    if (queue_.empty())
+      return false;
+    output = queue_.back();
+    queue_.pop_back();
+    return true;
+  }
+
+  bool steal(T& output) {
+    lock_guard<mutex> ul(lock_);
+    if (queue_.empty())
+      return false;
+    output = queue_.front();
+    queue_.pop_front();
+    return true;
+  }
+};
+
+// Task that has been fork()'d, but is free to be stolen by another worker.
+struct unbound_task {
+  bound_task* parent_;
+  __callable_task task_;
+
+  unbound_task() = default;
+  unbound_task(const __callable_task& task) : parent_(nullptr), task_(task) {}
+  unbound_task(bound_task* parent, __callable_task&& task)
+      : parent_(parent), task_(move(task)) {}
+};
+
+class worker {
+  stealing_deque<unbound_task> queue_;
+  task_group& group_;
+  atomic<bool> dead_;
+
+  friend class task_group;
+
+  unsigned get_worker_id();
+
+public:
+  worker(task_group& group) : group_(group), dead_(true) {}
+  worker(const worker& other)
+      : queue_(other.queue_), group_(other.group_), dead_(true) {}
+
+  void defer(unbound_task&& task) { queue_.emplace_back(move(task)); }
+
+  template <class Fp>
+  void wait(const Fp& until);
+  void worker_main();
+};
+
+class task_group {
+  vector<worker> workers_;
+
+  mutex global_tasks_mutex_;
+  vector<__callable_task> global_tasks_;
+
+  // This task group could potentially be running many parallel algorithms. This
+  // keeps track of how many root tasks are currently being run. When it reaches
+  // 0, workers stop trying to steal work and return.
+  atomic<unsigned> parallel_users_;
+
+  friend class worker;
+
+  void activate_workers() {
+    for (worker& w : workers_) {
+      bool expected = true;
+      if (w.dead_.compare_exchange_strong(expected, false))
+        thread(&worker::worker_main, &w).detach();
+    }
+  }
+
+public:
+  task_group(unsigned threads = thread::hardware_concurrency())
+      : workers_(threads == 0 ? 1 : threads, worker(*this)) {}
+
+  // Start up a root task, and wait until it finishes.
+  void evaluate_top_level_task(const __callable_task& task) {
+    // If we're reviving (or starting) this task group, start the workers.
+    if (++parallel_users_ == 1)
+      activate_workers();
+
+    condition_variable cv;
+    mutex m;
+    unique_lock<mutex> guard(m);
+
+    {
+      lock_guard<mutex> lg(global_tasks_mutex_);
+      global_tasks_.emplace_back([&](__task t) {
+        task(t);
+        m.lock(), m.unlock(); // Make sure that the caller is waiting.
+        cv.notify_one();
+      });
+    }
+
+    cv.wait(guard);
+    --parallel_users_;
+  }
+
+  bool steal(unbound_task& task, unsigned worker_id) {
+    unsigned size = workers_.size();
+    for (unsigned worker = (worker_id + 1) % size, count = 0; count < size;
+         ++count, worker = (worker + 1) % size)
+      if (workers_[worker].queue_.steal(task))
+        return true;
+    return false;
+  }
+
+  bool steal_global(unbound_task& task) {
+    lock_guard<mutex> lg(global_tasks_mutex_);
+    if (global_tasks_.empty())
+      return false;
+    task = unbound_task(global_tasks_.back());
+    global_tasks_.pop_back();
+    return true;
+  }
+
+  bool active() const { return parallel_users_.load(); }
+};
+
+// Task that is bound to a specific worker.
+class bound_task {
+  unbound_task bound_;
+  worker* worker_;
+  atomic<unsigned> ref_count_;
+
+  friend class worker;
+  void eval() {
+    bound_.task_(this);
+    if (bound_.parent_) // This may be the first task.
+      --bound_.parent_->ref_count_;
+  }
+
+public:
+  bound_task(unbound_task&& binding, worker* worker)
+      : bound_(move(binding)), worker_(worker), ref_count_(0) {}
+
+  void fork(__callable_task&& child) {
+    ++ref_count_;
+    worker_->defer(unbound_task(this, move(child)));
+  }
+
+  void join() {
+    worker_->wait([this] { return ref_count_.load() == 0; });
+  }
+};
+} // namespace
+
+template <class Fp>
+inline void worker::wait(const Fp& until) {
+  while (!until()) {
+    unbound_task task;
+    if (!queue_.pop_back(task)) {
+      // queue_ can only be added to by this thread, so there is no point in
+      // checking it now that we know its empty. Spin looking for tasks
+      // elsewhere.
+      while (true) {
+        if (group_.steal_global(task) || group_.steal(task, get_worker_id()))
+          break;
+        if (until())
+          return;
+        this_thread::yield();
+      }
+    }
+
+    // We've acquired some task by some means; eval it.
+    try {
+      bound_task bound(move(task), this);
+      bound.eval();
+      bound.join();
+    } catch (...) {
+      // All execution policies require terminate() to be called here.
+      terminate();
+    }
+  }
+}
+
+inline unsigned worker::get_worker_id() {
+  return static_cast<unsigned>(distance(&*group_.workers_.begin(), this));
+}
+
+inline void worker::worker_main() {
+  // Continue execution until all tasks have been completed.
+  wait([this] { return !group_.active(); });
+  dead_.store(true);
+}
+
+void __task::__fork(__callable_task&& work) {
+  static_cast<bound_task*>(__impl_)->fork(move(work));
+}
+
+void __task::__join() { static_cast<bound_task*>(__impl_)->join(); }
+
+void __evaluate_parallel_task(const __callable_task& work) {
+  static task_group parallel_task_group;
+  parallel_task_group.evaluate_top_level_task(move(work));
+}
+
+_LIBCPP_END_NAMESPACE_EXPERIMENTAL
+
+#endif // _LIBCPP_HAS_NO_THREADS
Index: include/experimental/execution
===================================================================
--- /dev/null
+++ include/experimental/execution
@@ -0,0 +1,95 @@
+// -*- C++ -*-
+//===-------------------------- execution ---------------------------------===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is dual licensed under the MIT and the University of Illinois Open
+// Source Licenses. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef _LIBCPP_EXPERIMENTAL_EXECUTION
+#define _LIBCPP_EXPERIMENTAL_EXECUTION
+
+/*
+    execution synopsis
+
+namespace std {
+
+template <class T>
+struct is_execution_policy;
+
+template <class T>
+constexpr bool is_execution_policy_v = is_execution_policy<T>::value;
+
+} // namespace std
+
+namespace std::execution {
+
+class sequenced_policy;
+class parallel_policy;
+class parallel_unsequenced_policy;
+
+constexpr sequenced_policy seq{unspecified};
+
+constexpr parallel_policy par{unspecified};
+
+constexpr parallel_unsequenced_policy par_unseq{unspecified};
+
+} // namespace std::execution
+
+*/
+
+#include <experimental/__config>
+#include <functional>
+
+_LIBCPP_BEGIN_NAMESPACE_EXPERIMENTAL
+
+namespace execution {
+class sequenced_policy {};
+class parallel_policy {};
+class parallel_unsequenced_policy {};
+
+constexpr sequenced_policy seq{};
+constexpr parallel_policy par{};
+constexpr parallel_unsequenced_policy par_unseq{};
+} // namespace execution
+
+// is_execution_policy
+
+template <class _Tp>
+struct is_execution_policy : false_type {};
+
+template <>
+struct is_execution_policy<execution::sequenced_policy> : true_type {};
+template <>
+struct is_execution_policy<execution::parallel_policy> : true_type {};
+template <>
+struct is_execution_policy<execution::parallel_unsequenced_policy> : true_type {
+};
+
+#if _LIBCPP_STD_VER >= 14
+template <class _Tp>
+constexpr bool is_execution_policy_v = is_execution_policy<_Tp>::value;
+#endif
+
+// API parallel algorithms will use.
+
+class __task;
+typedef function<void(__task)> __callable_task;
+
+class _LIBCPP_TYPE_VIS __task {
+  void* __impl_;
+
+public:
+  __task(void* __impl) : __impl_(__impl) {}
+
+  void __fork(__callable_task&& __work);
+  void __join();
+};
+
+void _LIBCPP_FUNC_VIS __evaluate_parallel_task(const __callable_task& __t);
+
+_LIBCPP_END_NAMESPACE_EXPERIMENTAL
+
+#endif
_______________________________________________
cfe-commits mailing list
cfe-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits
  • [PATCH] D33977: [l... Erik Pilkington via Phabricator via cfe-commits

Reply via email to