This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bade50db566 [chore](test) Add testing util sync point (#28924)
bade50db566 is described below
commit bade50db566f77ce88e541f8845147ab8aacbff7
Author: Gavin Chou <[email protected]>
AuthorDate: Sun Dec 24 21:59:11 2023 +0800
[chore](test) Add testing util sync point (#28924)
---
be/src/common/sync_point.cpp | 236 ++++++++++++++++++++++++++++++++++++++++++
be/src/common/sync_point.h | 240 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 476 insertions(+)
diff --git a/be/src/common/sync_point.cpp b/be/src/common/sync_point.cpp
new file mode 100644
index 00000000000..816c5a82a94
--- /dev/null
+++ b/be/src/common/sync_point.cpp
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most code of this file is copied from rocksdb SyncPoint.
+// https://github.com/facebook/rocksdb
+
+// clang-format off
+#include "sync_point.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <random>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace doris {
+
+struct SyncPoint::Data { // impl
+public:
+ Data() : enabled_(false) { }
+ virtual ~Data() {}
+ void process(const std::string& point, std::vector<std::any>&& cb_args);
+ void load_dependency(const std::vector<SyncPointPair>& dependencies);
+ void load_dependency_and_markers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+ bool predecessors_all_cleared(const std::string& point);
+ void set_call_back(const std::string& point,
+ const std::function<void(std::vector<std::any>&&)>&
callback);
+ void clear_call_back(const std::string& point);
+ void clear_all_call_backs();
+ void enable_processing();
+ void disable_processing();
+ void clear_trace();
+private:
+ bool disable_by_marker(const std::string& point, std::thread::id thread_id);
+private:
+ // successor/predecessor map loaded from load_dependency
+ std::unordered_map<std::string, std::vector<std::string>> successors_;
+ std::unordered_map<std::string, std::vector<std::string>> predecessors_;
+ std::unordered_map<std::string,
std::function<void(std::vector<std::any>&&)>> callbacks_;
+ std::unordered_map<std::string, std::vector<std::string>> markers_;
+ std::unordered_map<std::string, std::thread::id> marked_thread_id_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ // sync points that have been passed through
+ std::unordered_set<std::string> cleared_points_;
+ std::atomic<bool> enabled_;
+ int num_callbacks_running_ = 0;
+};
+
+SyncPoint* SyncPoint::get_instance() {
+ static SyncPoint sync_point;
+ return &sync_point;
+}
+SyncPoint::SyncPoint() :
+ impl_(new Data) {
+}
+SyncPoint:: ~SyncPoint() {
+ delete impl_;
+}
+void SyncPoint::load_dependency(const std::vector<SyncPointPair>&
dependencies) {
+ impl_->load_dependency(dependencies);
+}
+void SyncPoint::load_dependency_and_markers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ impl_->load_dependency_and_markers(dependencies, markers);
+}
+void SyncPoint::set_call_back(const std::string& point,
+ const
std::function<void(std::vector<std::any>&&)>& callback) {
+ impl_->set_call_back(point, callback);
+}
+void SyncPoint::clear_call_back(const std::string& point) {
+ impl_->clear_call_back(point);
+}
+void SyncPoint::clear_all_call_backs() {
+ impl_->clear_all_call_backs();
+}
+void SyncPoint::enable_processing() {
+ impl_->enable_processing();
+}
+void SyncPoint::disable_processing() {
+ impl_->disable_processing();
+}
+void SyncPoint::clear_trace() {
+ impl_->clear_trace();
+}
+void SyncPoint::process(const std::string& point, std::vector<std::any>&&
cb_arg) {
+ impl_->process(point, std::move(cb_arg));
+}
+
+//
=============================================================================
+// SyncPoint implementation
+//
=============================================================================
+
+void SyncPoint::Data::load_dependency(
+ const std::vector<SyncPointPair>& dependencies)
{
+ std::lock_guard lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ }
+ cv_.notify_all();
+}
+
+/**
+ * Markers are also dependency descriptions
+ */
+void SyncPoint::Data::load_dependency_and_markers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ std::lock_guard lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ markers_.clear();
+ marked_thread_id_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ }
+ for (const auto& marker : markers) {
+ successors_[marker.predecessor].push_back(marker.successor);
+ predecessors_[marker.successor].push_back(marker.predecessor);
+ markers_[marker.predecessor].push_back(marker.successor);
+ }
+ cv_.notify_all();
+}
+
+bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) {
+ for (const auto& pred : predecessors_[point]) {
+ if (cleared_points_.count(pred) == 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SyncPoint::Data::clear_call_back(const std::string& point) {
+ std::unique_lock lock(mutex_);
+ callbacks_.erase(point);
+}
+
+void SyncPoint::Data::clear_all_call_backs() {
+ std::unique_lock lock(mutex_);
+ callbacks_.clear();
+}
+
+void SyncPoint::Data::process(const std::string& point,
std::vector<std::any>&& cb_arg) {
+ if (!enabled_) {
+ return;
+ }
+ std::unique_lock lock(mutex_);
+ auto thread_id = std::this_thread::get_id();
+ auto marker_iter = markers_.find(point);
+ // if current sync point is a marker
+ // record it in marked_thread_id_ for all its successors
+ if (marker_iter != markers_.end()) {
+ for (auto& marked_point : marker_iter->second) {
+ marked_thread_id_.emplace(marked_point, thread_id);
+ }
+ }
+ // if current point is a marker's successor
+ if (disable_by_marker(point, thread_id)) {
+ return;
+ }
+ while (!predecessors_all_cleared(point)) {
+ cv_.wait(lock);
+ if (disable_by_marker(point, thread_id)) {
+ return;
+ }
+ }
+ auto callback_pair = callbacks_.find(point);
+ if (callback_pair != callbacks_.end()) {
+ num_callbacks_running_++;
+ auto callback = callback_pair->second;
+ mutex_.unlock();
+ callback(std::move(cb_arg));
+ mutex_.lock();
+ num_callbacks_running_--;
+ }
+ cleared_points_.insert(point);
+ cv_.notify_all();
+}
+
+bool SyncPoint::Data::disable_by_marker(const std::string& point,
+ std::thread::id thread_id) {
+ auto marked_point_iter = marked_thread_id_.find(point);
+ return marked_point_iter != marked_thread_id_.end() // is a successor
+ && thread_id != marked_point_iter->second;
+}
+
+void SyncPoint::Data::set_call_back(const std::string& point,
+ const
std::function<void(std::vector<std::any>&&)>& callback) {
+ std::lock_guard lock(mutex_);
+ callbacks_[point] = callback;
+}
+
+void SyncPoint::Data::clear_trace() {
+ std::lock_guard lock(mutex_);
+ cleared_points_.clear();
+}
+
+void SyncPoint::Data::enable_processing() {
+ enabled_ = true;
+}
+
+void SyncPoint::Data::disable_processing() {
+ enabled_ = false;
+}
+
+} // namespace doris
+// clang-format on
+// vim: et tw=80 ts=2 sw=2 cc=80:
diff --git a/be/src/common/sync_point.h b/be/src/common/sync_point.h
new file mode 100644
index 00000000000..18b3a63c05e
--- /dev/null
+++ b/be/src/common/sync_point.h
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most code of this file is copied from rocksdb SyncPoint.
+// https://github.com/facebook/rocksdb
+
+#pragma once
+// clang-format off
+#include <functional>
+#include <iostream>
+#include <string>
+#include <vector>
+#include <any>
+
+namespace doris {
+
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, ...)
\
+ [&]() mutable {
\
+ TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, decltype((expr)) {},
__VA_ARGS__); \
+ return (expr);
\
+ }()
+
+// This class provides facility to reproduce race conditions deterministically
+// in unit tests.
+// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
+// Each sync point represents a position in the execution stream of a thread.
+// In the unit test, 'Happens After' relationship among sync points could be
+// setup via SyncPoint::load_dependency, to reproduce a desired interleave of
+// threads execution.
+// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
+class SyncPoint {
+public:
+ static SyncPoint* get_instance();
+ SyncPoint(const SyncPoint&) = delete;
+ SyncPoint& operator=(const SyncPoint&) = delete;
+ ~SyncPoint();
+ struct SyncPointPair {
+ std::string predecessor;
+ std::string successor;
+ };
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points
+ //
+ // Example:
+ // load_dependency({{"point1", "point2"},
+ // {"point2", "point3"},
+ // {"point3", "point4"}});
+ //
+ // test case thread thread for object being tested
+ // | |
+ // | |
+ // | \-------------0-------------\ |
+ // | \-> x sync point1 set in code
+ // | /----------1----------------/ |
+ // point2 o <-/ /-> x sync point4 set in code
+ // | / |
+ // z / |
+ // z /---------2-----------/ | there may be nothing
+ // | / | between point1 point4
+ // ponit3 o --/ | they are for sync
+ // | | between test case and object
+ // v v
+ //
+ // vertical arrow means the procedure of each thread, the running order will
+ // be:
+ // test case thread -> point1 -> point2 -> point3 -> point4 -> object being
+ // tested
+ //
+ // we may do a lot of things between point2 and point3, say, change the
+ // object's status, call another method, propagate data race and etc.
+ void load_dependency(const std::vector<SyncPointPair>& dependencies);
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points and setup markers indicating the successor is only enabled
+ // when it is processed on the same thread as the predecessor.
+ // When adding a marker, it implicitly adds a dependency for the marker pair.
+ void load_dependency_and_markers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+
+ // The argument to the callback is passed through from
+ // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
+ // TEST_IDX_SYNC_POINT was used.
+ void set_call_back(const std::string& point,
+ const std::function<void(std::vector<std::any>&&)>&
callback);
+
+ // Clear callback function by point
+ void clear_call_back(const std::string& point);
+
+ // Clear all call back functions.
+ void clear_all_call_backs();
+
+ // Enable sync point processing (disabled on startup)
+ void enable_processing();
+
+ // Disable sync point processing
+ void disable_processing();
+
+ // Remove the execution trace of all sync points
+ void clear_trace();
+
+ // Triggered by TEST_SYNC_POINT, blocking execution until all predecessors
+ // are executed.
+ // And/or call registered callback function, with argument `cb_args`
+ void process(const std::string& point, std::vector<std::any>&& cb_args = {});
+
+ // TODO: it might be useful to provide a function that blocks until all
+ // sync points are cleared.
+ // We want this to be public so we can subclass the implementation
+ struct Data;
+
+private:
+ // Singleton
+ SyncPoint();
+ Data* impl_; // impletation which is hidden in cpp file
+};
+
+template <class T>
+T try_any_cast(const std::any& a) {
+ try {
+ return std::any_cast<T>(a);
+ } catch (const std::bad_any_cast& e) {
+ std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" <<
a.type().name() << std::endl;
+ throw e;
+ }
+}
+
+template <typename T>
+auto try_any_cast_ret(std::vector<std::any>& any) {
+ return try_any_cast<std::pair<T, bool>*>(any.back());
+}
+
+} // namespace doris
+
+#define SYNC_POINT(x) doris::SyncPoint::get_instance()->process(x)
+#define IDX_SYNC_POINT(x, index) \
+ doris::SyncPoint::get_instance()->process(x + std::to_string(index))
+#define SYNC_POINT_CALLBACK(x, ...)
doris::SyncPoint::get_instance()->process(x, {__VA_ARGS__})
+#define SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) \
+{ \
+ std::pair ret {default_ret_val, false}; \
+ std::vector<std::any> args {__VA_ARGS__}; \
+ args.push_back(&ret); \
+ doris::SyncPoint::get_instance()->process(x, std::move(args)); \
+ if (ret.second) return std::move(ret.first); \
+}
+#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \
+{ \
+ bool pred = false; \
+ std::vector<std::any> args {__VA_ARGS__}; \
+ args.push_back(&pred); \
+ doris::SyncPoint::get_instance()->process(x, std::move(args)); \
+ if (pred) return; \
+}
+#define SYNC_POINT_SINGLETON() (void)doris::SyncPoint::get_instance()
+
+// TEST_SYNC_POINT is no op in release build.
+// Turn on this feature by defining the macro
+#ifndef BE_TEST
+# define TEST_SYNC_POINT(x)
+# define TEST_IDX_SYNC_POINT(x, index)
+# define TEST_SYNC_POINT_CALLBACK(x, ...)
+# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
+# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...)
+// seldom called
+# define TEST_SYNC_POINT_SINGLETON()
+#else
+// Use TEST_SYNC_POINT to specify sync points inside code base.
+// Sync points can have happens-after depedency on other sync points,
+// configured at runtime via SyncPoint::load_dependency. This could be
+// utilized to re-produce race conditions between threads.
+# define TEST_SYNC_POINT(x) SYNC_POINT(x)
+# define TEST_IDX_SYNC_POINT(x, index) IDX_SYNC_POINT(x, index)
+# define TEST_SYNC_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__)
+# define TEST_SYNC_POINT_SINGLETON() SYNC_POINT_SINGLETON()
+
+/**
+ * Inject return points for testing.
+ *
+ * Currently we can only insert more points to get context from tested thread
+ * and process in testing thread, e.g.
+ *
+ * tested thread:
+ * ...
+ * TEST_SYNC_POINT_RETURN_WITH_VALUE("point_ret", int(0), ctx0);
+ * ...
+ *
+ * testing thread:
+ * sync_point->add("point_ret", [](auto&& args) {
+ * auto ctx0 = try_any_cast<bool>(args[0]);
+ * auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
+ * pair->first = ...;
+ * pair->second = ctx0; });
+ *
+ * See sync_piont_test.cpp for more details.
+ */
+#pragma GCC diagnostic ignored "-Waddress"
+# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__)
+# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...)
SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__)
+
+#endif // BE_TEST
+
+// TODO: define injection point in production env.
+// the `if` expr can be live configure of the application
+#ifndef ENABLE_INJECTION_POINT
+# define TEST_INJECTION_POINT(x)
+# define TEST_IDX_INJECTION_POINT(x, index)
+# define TEST_INJECTION_POINT_CALLBACK(x, ...)
+# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...)
+# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...)
+# define TEST_INJECTION_POINT_SINGLETON()
+#else
+namespace doris::config {
+extern bool enable_injection_point;
+}
+# define TEST_INJECTION_POINT(x) if (doris::config::enable_injection_point) {
SYNC_POINT(x); }
+# define TEST_IDX_INJECTION_POINT(x, index) if
(doris::config::enable_injection_point) { IDX_SYNC_POINT(x, index); }
+# define TEST_INJECTION_POINT_CALLBACK(x, ...) if
(doris::config::enable_injection_point) { SYNC_POINT_CALLBACK(x, __VA_ARGS__); }
+# define TEST_INJECTION_POINT_SINGLETON() if
(doris::config::enable_injection_point) { SYNC_POINT_SINGLETON(); }
+# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) if
(doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VALUE(x,
default_ret_val, __VA_ARGS__); }
+# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) if
(doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VOID(x,
__VA_ARGS__); }
+#endif // ENABLE_INJECTION_POINT
+
+// clang-format on
+// vim: et tw=80 ts=2 sw=2 cc=80:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]