This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bade50db566 [chore](test) Add testing util sync point (#28924)
bade50db566 is described below

commit bade50db566f77ce88e541f8845147ab8aacbff7
Author: Gavin Chou <[email protected]>
AuthorDate: Sun Dec 24 21:59:11 2023 +0800

    [chore](test) Add testing util sync point (#28924)
---
 be/src/common/sync_point.cpp | 236 ++++++++++++++++++++++++++++++++++++++++++
 be/src/common/sync_point.h   | 240 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 476 insertions(+)

diff --git a/be/src/common/sync_point.cpp b/be/src/common/sync_point.cpp
new file mode 100644
index 00000000000..816c5a82a94
--- /dev/null
+++ b/be/src/common/sync_point.cpp
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most code of this file is copied from rocksdb SyncPoint.
+// https://github.com/facebook/rocksdb
+
+// clang-format off
+#include "sync_point.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <random>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace doris {
+
+struct SyncPoint::Data { // impl
+public:
+  Data() : enabled_(false) { }
+  virtual ~Data() {}
+  void process(const std::string& point, std::vector<std::any>&& cb_args);
+  void load_dependency(const std::vector<SyncPointPair>& dependencies);
+  void load_dependency_and_markers(
+                                const std::vector<SyncPointPair>& dependencies,
+                                const std::vector<SyncPointPair>& markers);
+  bool predecessors_all_cleared(const std::string& point);
+  void set_call_back(const std::string& point,
+                    const std::function<void(std::vector<std::any>&&)>& 
callback);
+  void clear_call_back(const std::string& point);
+  void clear_all_call_backs();
+  void enable_processing();
+  void disable_processing();
+  void clear_trace();
+private:
+  bool disable_by_marker(const std::string& point, std::thread::id thread_id);
+private:
+  // successor/predecessor map loaded from load_dependency
+  std::unordered_map<std::string, std::vector<std::string>> successors_;
+  std::unordered_map<std::string, std::vector<std::string>> predecessors_;
+  std::unordered_map<std::string, 
std::function<void(std::vector<std::any>&&)>> callbacks_;
+  std::unordered_map<std::string, std::vector<std::string>> markers_;
+  std::unordered_map<std::string, std::thread::id> marked_thread_id_;
+  std::mutex mutex_;
+  std::condition_variable cv_;
+  // sync points that have been passed through
+  std::unordered_set<std::string> cleared_points_;
+  std::atomic<bool> enabled_;
+  int num_callbacks_running_ = 0;
+};
+
+SyncPoint* SyncPoint::get_instance() {
+  static SyncPoint sync_point;
+  return &sync_point;
+}
+SyncPoint::SyncPoint() : 
+  impl_(new Data) {
+}
+SyncPoint:: ~SyncPoint() {
+  delete impl_;
+}
+void SyncPoint::load_dependency(const std::vector<SyncPointPair>& 
dependencies) {
+  impl_->load_dependency(dependencies);
+}
+void SyncPoint::load_dependency_and_markers(
+                                const std::vector<SyncPointPair>& dependencies,
+                                const std::vector<SyncPointPair>& markers) {
+  impl_->load_dependency_and_markers(dependencies, markers);
+}
+void SyncPoint::set_call_back(const std::string& point,
+                              const 
std::function<void(std::vector<std::any>&&)>& callback) {
+  impl_->set_call_back(point, callback);
+}
+void SyncPoint::clear_call_back(const std::string& point) {
+  impl_->clear_call_back(point);
+}
+void SyncPoint::clear_all_call_backs() {
+  impl_->clear_all_call_backs();
+}
+void SyncPoint::enable_processing() {
+  impl_->enable_processing();
+}
+void SyncPoint::disable_processing() {
+  impl_->disable_processing();
+}
+void SyncPoint::clear_trace() {
+  impl_->clear_trace();
+}
+void SyncPoint::process(const std::string& point, std::vector<std::any>&& 
cb_arg) {
+  impl_->process(point, std::move(cb_arg));
+}
+
+// 
=============================================================================
+// SyncPoint implementation
+// 
=============================================================================
+
+void SyncPoint::Data::load_dependency(
+                               const std::vector<SyncPointPair>& dependencies) 
{
+  std::lock_guard lock(mutex_);
+  successors_.clear();
+  predecessors_.clear();
+  cleared_points_.clear();
+  for (const auto& dependency : dependencies) {
+    successors_[dependency.predecessor].push_back(dependency.successor);
+    predecessors_[dependency.successor].push_back(dependency.predecessor);
+  }
+  cv_.notify_all();
+}
+
+/**
+ * Markers are also dependency descriptions
+ */
+void SyncPoint::Data::load_dependency_and_markers(
+                                const std::vector<SyncPointPair>& dependencies,
+                                const std::vector<SyncPointPair>& markers) {
+  std::lock_guard lock(mutex_);
+  successors_.clear();
+  predecessors_.clear();
+  cleared_points_.clear();
+  markers_.clear();
+  marked_thread_id_.clear();
+  for (const auto& dependency : dependencies) {
+    successors_[dependency.predecessor].push_back(dependency.successor);
+    predecessors_[dependency.successor].push_back(dependency.predecessor);
+  }
+  for (const auto& marker : markers) {
+    successors_[marker.predecessor].push_back(marker.successor);
+    predecessors_[marker.successor].push_back(marker.predecessor);
+    markers_[marker.predecessor].push_back(marker.successor);
+  }
+  cv_.notify_all();
+}
+
+bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) {
+  for (const auto& pred : predecessors_[point]) {
+    if (cleared_points_.count(pred) == 0) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void SyncPoint::Data::clear_call_back(const std::string& point) {
+  std::unique_lock lock(mutex_);
+  callbacks_.erase(point);
+}
+
+void SyncPoint::Data::clear_all_call_backs() {
+  std::unique_lock lock(mutex_);
+  callbacks_.clear();
+}
+
+void SyncPoint::Data::process(const std::string& point, 
std::vector<std::any>&& cb_arg) {
+  if (!enabled_) {
+    return;
+  }
+  std::unique_lock lock(mutex_);
+  auto thread_id = std::this_thread::get_id();
+  auto marker_iter = markers_.find(point);
+  // if current sync point is a marker
+  // record it in marked_thread_id_ for all its successors
+  if (marker_iter != markers_.end()) {
+    for (auto& marked_point : marker_iter->second) {
+      marked_thread_id_.emplace(marked_point, thread_id);
+    }
+  }
+  // if current point is a marker's successor 
+  if (disable_by_marker(point, thread_id)) {
+    return;
+  }
+  while (!predecessors_all_cleared(point)) {
+    cv_.wait(lock);
+    if (disable_by_marker(point, thread_id)) {
+      return;
+    }
+  }
+  auto callback_pair = callbacks_.find(point);
+  if (callback_pair != callbacks_.end()) {
+    num_callbacks_running_++;
+    auto callback = callback_pair->second; 
+    mutex_.unlock();
+    callback(std::move(cb_arg));
+    mutex_.lock();
+    num_callbacks_running_--;
+  }
+  cleared_points_.insert(point);
+  cv_.notify_all();
+}
+
+bool SyncPoint::Data::disable_by_marker(const std::string& point,
+                                        std::thread::id thread_id) {
+  auto marked_point_iter = marked_thread_id_.find(point);
+  return marked_point_iter != marked_thread_id_.end() // is a successor
+          && thread_id != marked_point_iter->second;
+}
+
+void SyncPoint::Data::set_call_back(const std::string& point,
+                                  const 
std::function<void(std::vector<std::any>&&)>& callback) {
+  std::lock_guard lock(mutex_);
+  callbacks_[point] = callback;
+}
+
+void SyncPoint::Data::clear_trace() {
+  std::lock_guard lock(mutex_);
+  cleared_points_.clear();
+}
+
+void SyncPoint::Data::enable_processing() {
+  enabled_ = true;
+}
+
+void SyncPoint::Data::disable_processing() {
+  enabled_ = false;
+}
+
+} // namespace doris
+// clang-format on
+// vim: et tw=80 ts=2 sw=2 cc=80:
diff --git a/be/src/common/sync_point.h b/be/src/common/sync_point.h
new file mode 100644
index 00000000000..18b3a63c05e
--- /dev/null
+++ b/be/src/common/sync_point.h
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most code of this file is copied from rocksdb SyncPoint.
+// https://github.com/facebook/rocksdb
+
+#pragma once
+// clang-format off
+#include <functional>
+#include <iostream>
+#include <string>
+#include <vector>
+#include <any>
+
+namespace doris {
+
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, ...)                    
          \
+    [&]() mutable {                                                            
          \
+        TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, decltype((expr)) {}, 
__VA_ARGS__); \
+        return (expr);                                                         
          \
+    }()
+
+// This class provides facility to reproduce race conditions deterministically
+// in unit tests.
+// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
+// Each sync point represents a position in the execution stream of a thread.
+// In the unit test, 'Happens After' relationship among sync points could be
+// setup via SyncPoint::load_dependency, to reproduce a desired interleave of
+// threads execution.
+// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
+class SyncPoint {
+public:
+  static SyncPoint* get_instance();
+  SyncPoint(const SyncPoint&) = delete;
+  SyncPoint& operator=(const SyncPoint&) = delete;
+  ~SyncPoint();
+  struct SyncPointPair {
+    std::string predecessor;
+    std::string successor;
+  };
+
+  // call once at the beginning of a test to setup the dependency between
+  // sync points
+  //
+  // Example:
+  // load_dependency({{"point1", "point2"},
+  //                  {"point2", "point3"},
+  //                  {"point3", "point4"}});
+  //
+  //    test case thread            thread for object being tested
+  //        |                                  |
+  //        |                                  |
+  //        | \-------------0-------------\    |
+  //        |                              \-> x  sync point1 set in code
+  //        |    /----------1----------------/ |
+  // point2 o <-/                          /-> x  sync point4 set in code
+  //        |                             /    |
+  //        z                            /     |
+  //        z     /---------2-----------/      |  there may be nothing
+  //        |    /                             |  between point1 point4
+  // ponit3 o --/                              |  they are for sync
+  //        |                                  |  between test case and object
+  //        v                                  v
+  //
+  // vertical arrow means the procedure of each thread, the running order will
+  // be:
+  // test case thread -> point1 -> point2 -> point3 -> point4 -> object being
+  // tested
+  //
+  // we may do a lot of things between point2 and point3, say, change the
+  // object's status, call another method, propagate data race and etc.
+  void load_dependency(const std::vector<SyncPointPair>& dependencies);
+
+  // call once at the beginning of a test to setup the dependency between
+  // sync points and setup markers indicating the successor is only enabled
+  // when it is processed on the same thread as the predecessor.
+  // When adding a marker, it implicitly adds a dependency for the marker pair.
+  void load_dependency_and_markers(
+                                const std::vector<SyncPointPair>& dependencies,
+                                const std::vector<SyncPointPair>& markers);
+
+  // The argument to the callback is passed through from
+  // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
+  // TEST_IDX_SYNC_POINT was used.
+  void set_call_back(const std::string& point,
+                     const std::function<void(std::vector<std::any>&&)>& 
callback);
+
+  // Clear callback function by point
+  void clear_call_back(const std::string& point);
+
+  // Clear all call back functions.
+  void clear_all_call_backs();
+
+  // Enable sync point processing (disabled on startup)
+  void enable_processing();
+
+  // Disable sync point processing
+  void disable_processing();
+
+  // Remove the execution trace of all sync points
+  void clear_trace();
+
+  // Triggered by TEST_SYNC_POINT, blocking execution until all predecessors
+  // are executed.
+  // And/or call registered callback function, with argument `cb_args`
+  void process(const std::string& point, std::vector<std::any>&& cb_args = {});
+
+  // TODO: it might be useful to provide a function that blocks until all
+  //       sync points are cleared.
+  // We want this to be public so we can subclass the implementation
+  struct Data;
+
+private:
+   // Singleton
+  SyncPoint();
+  Data* impl_; // impletation which is hidden in cpp file
+};
+
+template <class T>
+T try_any_cast(const std::any& a) {
+  try {
+    return std::any_cast<T>(a);
+  } catch (const std::bad_any_cast& e) { 
+    std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << 
a.type().name() << std::endl;
+    throw e;
+  }
+}
+
+template <typename T>
+auto try_any_cast_ret(std::vector<std::any>& any) {
+    return try_any_cast<std::pair<T, bool>*>(any.back());
+}
+
+} // namespace doris
+
+#define SYNC_POINT(x) doris::SyncPoint::get_instance()->process(x)
+#define IDX_SYNC_POINT(x, index) \
+    doris::SyncPoint::get_instance()->process(x + std::to_string(index))
+#define SYNC_POINT_CALLBACK(x, ...) 
doris::SyncPoint::get_instance()->process(x, {__VA_ARGS__})
+#define SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) \
+{ \
+  std::pair ret {default_ret_val, false}; \
+  std::vector<std::any> args {__VA_ARGS__}; \
+  args.push_back(&ret); \
+  doris::SyncPoint::get_instance()->process(x, std::move(args)); \
+  if (ret.second) return std::move(ret.first); \
+}
+#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \
+{ \
+  bool pred = false; \
+  std::vector<std::any> args {__VA_ARGS__}; \
+  args.push_back(&pred); \
+  doris::SyncPoint::get_instance()->process(x, std::move(args)); \
+  if (pred) return; \
+}
+#define SYNC_POINT_SINGLETON() (void)doris::SyncPoint::get_instance() 
+
+// TEST_SYNC_POINT is no op in release build.
+// Turn on this feature by defining the macro
+#ifndef BE_TEST
+# define TEST_SYNC_POINT(x)
+# define TEST_IDX_SYNC_POINT(x, index)
+# define TEST_SYNC_POINT_CALLBACK(x, ...)
+# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
+# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...)
+// seldom called
+# define TEST_SYNC_POINT_SINGLETON()
+#else
+// Use TEST_SYNC_POINT to specify sync points inside code base.
+// Sync points can have happens-after depedency on other sync points,
+// configured at runtime via SyncPoint::load_dependency. This could be
+// utilized to re-produce race conditions between threads.
+# define TEST_SYNC_POINT(x) SYNC_POINT(x)
+# define TEST_IDX_SYNC_POINT(x, index) IDX_SYNC_POINT(x, index)
+# define TEST_SYNC_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__)
+# define TEST_SYNC_POINT_SINGLETON() SYNC_POINT_SINGLETON()
+
+/**
+ * Inject return points for testing.
+ *
+ * Currently we can only insert more points to get context from tested thread
+ * and process in testing thread, e.g.
+ *
+ * tested thread:
+ * ...
+ * TEST_SYNC_POINT_RETURN_WITH_VALUE("point_ret", int(0), ctx0);
+ * ...
+ *
+ * testing thread:
+ * sync_point->add("point_ret", [](auto&& args) {
+ *     auto ctx0 = try_any_cast<bool>(args[0]);
+ *     auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
+ *     pair->first = ...;
+ *     pair->second = ctx0; });
+ *
+ * See sync_piont_test.cpp for more details.
+ */
+#pragma GCC diagnostic ignored "-Waddress"
+# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) 
SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__)
+# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...) 
SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__)
+
+#endif // BE_TEST
+
+// TODO: define injection point in production env.
+//       the `if` expr can be live configure of the application
+#ifndef ENABLE_INJECTION_POINT
+# define TEST_INJECTION_POINT(x)
+# define TEST_IDX_INJECTION_POINT(x, index)
+# define TEST_INJECTION_POINT_CALLBACK(x, ...)
+# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
+# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...)
+# define TEST_INJECTION_POINT_SINGLETON()
+#else
+namespace doris::config {
+extern bool enable_injection_point;
+}
+# define TEST_INJECTION_POINT(x) if (doris::config::enable_injection_point) { 
SYNC_POINT(x); }
+# define TEST_IDX_INJECTION_POINT(x, index) if 
(doris::config::enable_injection_point) { IDX_SYNC_POINT(x, index); }
+# define TEST_INJECTION_POINT_CALLBACK(x, ...) if 
(doris::config::enable_injection_point) { SYNC_POINT_CALLBACK(x, __VA_ARGS__); }
+# define TEST_INJECTION_POINT_SINGLETON() if 
(doris::config::enable_injection_point) { SYNC_POINT_SINGLETON(); }
+# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) if 
(doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VALUE(x, 
default_ret_val, __VA_ARGS__); }
+# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) if 
(doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VOID(x, 
__VA_ARGS__); }
+#endif // ENABLE_INJECTION_POINT
+
+// clang-format on
+// vim: et tw=80 ts=2 sw=2 cc=80:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to