This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a13d8a94375 [Refactor](lock) Remove SpinLock compeletely (#49872) a13d8a94375 is described below commit a13d8a943754040c104d34d0bda1aa7b68b0d336 Author: zclllyybb <zhaochan...@selectdb.com> AuthorDate: Thu Apr 10 20:18:15 2025 +0800 [Refactor](lock) Remove SpinLock compeletely (#49872) --- be/src/cloud/cloud_tablets_channel.cpp | 4 +- be/src/common/object_pool.h | 23 ++++---- be/src/olap/delta_writer.h | 1 - be/src/olap/delta_writer_v2.h | 1 - be/src/olap/memtable_writer.cpp | 14 ++--- be/src/olap/memtable_writer.h | 3 +- be/src/olap/rowset/beta_rowset_writer.h | 8 +-- be/src/olap/rowset/beta_rowset_writer_v2.h | 19 ++----- be/src/olap/rowset/segment_creator.h | 7 --- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 1 - be/src/olap/storage_engine.cpp | 6 +-- be/src/pipeline/exec/data_queue.h | 6 +-- be/src/runtime/load_channel.cpp | 2 +- be/src/runtime/load_channel.h | 5 +- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_writer.h | 10 +--- be/src/runtime/tablets_channel.cpp | 8 +-- be/src/runtime/tablets_channel.h | 3 +- be/src/util/lru_multi_cache.h | 35 +++++------- be/src/util/lru_multi_cache.inline.h | 26 ++++----- be/src/util/spinlock.h | 62 ---------------------- be/src/util/trace.h | 5 -- be/src/util/uuid_generator.h | 8 +-- be/src/vec/sink/writer/vtablet_writer.cpp | 12 ++--- be/src/vec/sink/writer/vtablet_writer.h | 7 ++- be/test/util/threadpool_test.cpp | 15 +++--- .../exec/format/parquet/parquet_thrift_test.cpp | 18 ++----- 27 files changed, 89 insertions(+), 222 deletions(-) diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index 63e47b69d06..7dddad90b95 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_tablets_channel.h" +#include <mutex> + #include "cloud/cloud_delta_writer.h" #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" @@ -62,7 +64,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques { // add_batch may concurrency with inc_open but not under _lock. // so need to protect it with _tablet_writers_lock. - std::lock_guard<SpinLock> l(_tablet_writers_lock); + std::lock_guard<std::mutex> l(_tablet_writers_lock); for (auto& [tablet_id, _] : tablet_to_rowidxs) { auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h index 3c000371a9c..ded8626599f 100644 --- a/be/src/common/object_pool.h +++ b/be/src/common/object_pool.h @@ -18,10 +18,9 @@ #pragma once #include <mutex> +#include <ranges> #include <vector> -#include "util/spinlock.h" - namespace doris { // An ObjectPool maintains a list of C++ objects which are deallocated @@ -30,31 +29,32 @@ namespace doris { class ObjectPool { public: ObjectPool() = default; - + ObjectPool(const ObjectPool&) = delete; + void operator=(const ObjectPool&) = delete; ~ObjectPool() { clear(); } template <class T> T* add(T* t) { // TODO: Consider using a lock-free structure. - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_lock); _objects.emplace_back(Element {t, [](void* obj) { delete reinterpret_cast<T*>(obj); }}); return t; } template <class T> T* add_array(T* t) { - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_lock); _objects.emplace_back(Element {t, [](void* obj) { delete[] reinterpret_cast<T*>(obj); }}); return t; } void clear() { - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_lock); // reverse delete object to make sure the obj can // safe access the member object construt early by // object pool - for (auto obj = _objects.rbegin(); obj != _objects.rend(); obj++) { - obj->delete_fn(obj->obj); + for (auto& _object : std::ranges::reverse_view(_objects)) { + _object.delete_fn(_object.obj); } _objects.clear(); } @@ -65,14 +65,11 @@ public: } uint64_t size() { - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_lock); return _objects.size(); } private: - ObjectPool(const ObjectPool&) = delete; - void operator=(const ObjectPool&) = delete; - /// A generic deletion function pointer. Deletes its first argument. using DeleteFn = void (*)(void*); @@ -83,7 +80,7 @@ private: }; std::vector<Element> _objects; - SpinLock _lock; + std::mutex _lock; }; } // namespace doris diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 550960f3f1e..f840e4f8aa1 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -36,7 +36,6 @@ #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" -#include "util/spinlock.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index be711b30c31..550c4b72f87 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -39,7 +39,6 @@ #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" -#include "util/spinlock.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index d76ed8a9525..e59792596c9 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -142,12 +142,12 @@ Status MemTableWriter::_flush_memtable_async() { DCHECK(_flush_token != nullptr); std::shared_ptr<MemTable> memtable; { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); memtable = _mem_table; _mem_table = nullptr; } { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); memtable->update_mem_type(MemType::WRITE_FINISHED); _freezed_mem_tables.push_back(memtable); } @@ -200,7 +200,7 @@ Status MemTableWriter::wait_flush() { void MemTableWriter::_reset_mem_table() { { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc, _unique_key_mow, _partial_update_info.get())); } @@ -226,7 +226,7 @@ Status MemTableWriter::close() { auto s = _flush_memtable_async(); { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); _mem_table.reset(); } _is_closed = true; @@ -325,7 +325,7 @@ Status MemTableWriter::cancel_with_status(const Status& st) { return Status::OK(); } { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); _mem_table.reset(); } if (_flush_token != nullptr) { @@ -353,7 +353,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) { } int64_t mem_usage = 0; { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); for (const auto& mem_table : _freezed_mem_tables) { auto mem_table_sptr = mem_table.lock(); if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) { @@ -365,7 +365,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) { } int64_t MemTableWriter::active_memtable_mem_consumption() { - std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + std::lock_guard<std::mutex> l(_mem_table_ptr_lock); return _mem_table != nullptr ? _mem_table->memory_usage() : 0; } diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index cdf15401196..aa1fd4025ed 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -33,7 +33,6 @@ #include "olap/partial_update_info.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" -#include "util/spinlock.h" #include "vec/common/custom_allocator.h" namespace doris { @@ -134,7 +133,7 @@ private: // Save the not active memtable that is in flush queue or under flushing. std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables; // The lock to protect _memtable and _freezed_mem_tables structure to avoid concurrency modification or read - SpinLock _mem_table_ptr_lock; + std::mutex _mem_table_ptr_lock; std::shared_ptr<ResourceContext> _resource_ctx; std::mutex _lock; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index c4561b317b7..fd67d6ad019 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -21,16 +21,13 @@ #include <gen_cpp/olap_common.pb.h> #include <gen_cpp/olap_file.pb.h> -#include <algorithm> #include <atomic> #include <condition_variable> #include <map> #include <memory> #include <mutex> -#include <optional> #include <roaring/roaring.hh> #include <string> -#include <unordered_set> #include <vector> #include "common/status.h" @@ -44,7 +41,6 @@ #include "olap/rowset/segment_creator.h" #include "segment_v2/inverted_index_file_writer.h" #include "segment_v2/segment.h" -#include "util/spinlock.h" namespace doris { namespace vectorized { @@ -80,7 +76,7 @@ public: } private: - mutable SpinLock _lock; + mutable std::mutex _lock; std::unordered_map<int /* seg_id */, io::FileWriterPtr> _file_writers; bool _closed {false}; }; @@ -109,7 +105,7 @@ public: int64_t get_total_index_size() const { return _total_size; } private: - mutable SpinLock _lock; + mutable std::mutex _lock; std::unordered_map<int /* seg_id */, InvertedIndexFileWriterPtr> _inverted_index_file_writers; int64_t _total_size = 0; }; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index aa91aa0f403..a5d367b00b9 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -20,33 +20,20 @@ #include <arpa/inet.h> #include <fmt/format.h> #include <gen_cpp/olap_file.pb.h> -#include <stddef.h> -#include <stdint.h> -#include <algorithm> -#include <atomic> -#include <condition_variable> -#include <map> +#include <cstdint> #include <memory> #include <mutex> -#include <optional> #include <roaring/roaring.hh> -#include <string> -#include <unordered_set> #include <vector> -#include "brpc/controller.h" -#include "brpc/stream.h" #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/olap_common.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/rowset.h" -#include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_creator.h" -#include "segment_v2/segment.h" -#include "util/spinlock.h" namespace doris { namespace vectorized { @@ -120,7 +107,7 @@ public: RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const override { - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_lock); *segment_num_rows = _segment_num_rows; return Status::OK(); } @@ -145,7 +132,7 @@ public: } private: - mutable SpinLock _lock; // protect following vectors. + mutable std::mutex _lock; // protect following vectors. // record rows number of every segment already written, using for rowid // conversion when compaction in unique key with MoW model std::vector<uint32_t> _segment_num_rows; diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index f8afd579892..437765cf704 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -20,18 +20,11 @@ #include <gen_cpp/internal_service.pb.h> #include <gen_cpp/olap_file.pb.h> -#include <string> -#include <typeinfo> -#include <unordered_map> -#include <vector> - #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" -#include "olap/olap_common.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/tablet_fwd.h" -#include "util/spinlock.h" #include "vec/core/block.h" namespace doris { diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index f493f21ac97..91ef37b33f9 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -37,7 +37,6 @@ #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_writer_context.h" #include "util/slice.h" -#include "util/spinlock.h" #include "vec/core/block.h" namespace doris { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1290ca29090..79da354409e 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -56,7 +56,6 @@ #include "olap/memtable_flush_executor.h" #include "olap/olap_common.h" #include "olap/olap_define.h" -#include "olap/olap_meta.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_meta_manager.h" @@ -72,7 +71,6 @@ #include "util/doris_metrics.h" #include "util/mem_info.h" #include "util/metrics.h" -#include "util/spinlock.h" #include "util/stopwatch.hpp" #include "util/thread.h" #include "util/threadpool.h" @@ -300,7 +298,7 @@ Status StorageEngine::_open() { Status StorageEngine::_init_store_map() { std::vector<std::thread> threads; - SpinLock error_msg_lock; + std::mutex error_msg_lock; std::string error_msg; for (auto& path : _options.store_paths) { auto store = std::make_unique<DataDir>(*this, path.path, path.capacity_bytes, @@ -309,7 +307,7 @@ Status StorageEngine::_init_store_map() { auto st = store->init(); if (!st.ok()) { { - std::lock_guard<SpinLock> l(error_msg_lock); + std::lock_guard<std::mutex> l(error_msg_lock); error_msg.append(st.to_string() + ";"); } LOG(WARNING) << "Store load failed, status=" << st.to_string() diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index 51bf975dd87..3929ee453ec 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -16,16 +16,14 @@ // under the License. #pragma once -#include <stdint.h> - #include <atomic> +#include <cstdint> #include <deque> #include <memory> #include <mutex> #include <vector> #include "common/status.h" -#include "util/spinlock.h" #include "vec/core/block.h" namespace doris::pipeline { @@ -114,7 +112,7 @@ private: // data queue is multi sink one source std::shared_ptr<Dependency> _source_dependency = nullptr; std::vector<Dependency*> _sink_dependencies; - SpinLock _source_lock; + std::mutex _source_lock; }; #include "common/compile_check_end.h" diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index d745132e589..2e18b002184 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -270,7 +270,7 @@ void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { ThriftSerializer ser(false, 4096); uint8_t* buf = nullptr; uint32_t len = 0; - std::lock_guard<SpinLock> l(_profile_serialize_lock); + std::lock_guard<std::mutex> l(_profile_serialize_lock); _profile->to_thrift(&tprofile); auto st = ser.serialize(&tprofile, &len, &buf); if (st.ok()) { diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 31164222a95..a9413b96e34 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -28,9 +28,8 @@ #include <utility> #include "common/status.h" -#include "runtime/thread_context.h" +#include "runtime/workload_management/resource_context.h" #include "util/runtime_profile.h" -#include "util/spinlock.h" #include "util/uid_util.h" namespace doris { @@ -89,7 +88,7 @@ private: UniqueId _load_id; int64_t _txn_id = 0; - SpinLock _profile_serialize_lock; + std::mutex _profile_serialize_lock; std::unique_ptr<RuntimeProfile> _profile; RuntimeProfile* _self_profile = nullptr; RuntimeProfile::Counter* _add_batch_number_counter = nullptr; diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index b7fce16b6f1..c3719111b89 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -20,7 +20,6 @@ #include <bthread/mutex.h> #include <gen_cpp/olap_common.pb.h> -#include <condition_variable> #include <memory> #include <mutex> #include <unordered_map> @@ -31,6 +30,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "runtime/load_stream_writer.h" +#include "runtime/workload_management/resource_context.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index 200c3b566d9..c85d0c48d42 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -19,24 +19,16 @@ #include <gen_cpp/internal_service.pb.h> -#include <atomic> #include <memory> #include <mutex> -#include <shared_mutex> -#include <unordered_set> #include <vector> -#include "brpc/stream.h" #include "butil/iobuf.h" #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/delta_writer_context.h" -#include "olap/memtable.h" -#include "olap/olap_common.h" -#include "olap/rowset/rowset_fwd.h" #include "olap/tablet_fwd.h" -#include "util/spinlock.h" -#include "util/uid_util.h" +#include "runtime/workload_management/resource_context.h" namespace doris { diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 6b0cf9e326f..53dc5db5d67 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -240,7 +240,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para auto delta_writer = create_delta_writer(wrequest); { // here we modify _tablet_writers. so need lock. - std::lock_guard<SpinLock> l(_tablet_writers_lock); + std::lock_guard<std::mutex> l(_tablet_writers_lock); _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer)); } @@ -448,7 +448,7 @@ void BaseTabletsChannel::refresh_profile() { int64_t max_tablet_write_mem_usage = 0; int64_t max_tablet_flush_mem_usage = 0; { - std::lock_guard<SpinLock> l(_tablet_writers_lock); + std::lock_guard<std::mutex> l(_tablet_writers_lock); for (auto&& [tablet_id, writer] : _tablet_writers) { int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED); write_mem_usage += write_mem; @@ -524,7 +524,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req auto delta_writer = create_delta_writer(wrequest); { - std::lock_guard<SpinLock> l(_tablet_writers_lock); + std::lock_guard<std::mutex> l(_tablet_writers_lock); _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer)); } } @@ -588,7 +588,7 @@ Status BaseTabletsChannel::_write_block_data( // so need to protect it with _tablet_writers_lock. decltype(_tablet_writers.find(tablet_id)) tablet_writer_it; { - std::lock_guard<SpinLock> l(_tablet_writers_lock); + std::lock_guard<std::mutex> l(_tablet_writers_lock); tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 55fe96df750..a8e2cc96f01 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -32,7 +32,6 @@ #include "common/status.h" #include "util/bitmap.h" #include "util/runtime_profile.h" -#include "util/spinlock.h" #include "util/uid_util.h" #include "vec/common/custom_allocator.h" @@ -174,7 +173,7 @@ protected: // tablet_id -> TabletChannel. it will only be changed in open() or inc_open() std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers; // protect _tablet_writers - SpinLock _tablet_writers_lock; + std::mutex _tablet_writers_lock; // broken tablet ids. // If a tablet write fails, it's id will be added to this set. // So that following batch will not handle this tablet anymore. diff --git a/be/src/util/lru_multi_cache.h b/be/src/util/lru_multi_cache.h index 8c810a06ee2..509625c8bdb 100644 --- a/be/src/util/lru_multi_cache.h +++ b/be/src/util/lru_multi_cache.h @@ -22,16 +22,10 @@ #pragma once #include <boost/intrusive/list.hpp> -#include <functional> #include <list> -#include <memory> #include <mutex> -#include <tuple> #include <unordered_map> -#include "gutil/macros.h" -#include "util/spinlock.h" - namespace doris { /// LruMultiCache is a threadsafe Least Recently Used Cache built on std::unordered_map @@ -99,7 +93,8 @@ public: LruMultiCache(LruMultiCache&&) = delete; LruMultiCache& operator=(LruMultiCache&&) = delete; - DISALLOW_COPY_AND_ASSIGN(LruMultiCache); + LruMultiCache(const LruMultiCache&) = delete; + const LruMultiCache& operator=(const LruMultiCache&) = delete; /// Returns the number of stored objects in O(1) time size_t size(); @@ -130,13 +125,12 @@ public: private: /// Doubly linked list and auto_unlink is used for O(1) remove from LRU list, in case of /// get and evict. - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode<boost::intrusive::auto_unlink>> - link_type; + using link_type = boost::intrusive::list_member_hook< + boost::intrusive::link_mode<boost::intrusive::auto_unlink>>; /// Internal type storing everything needed for O(1) operations struct ValueType_internal { - typedef std::list<ValueType_internal> Container_internal; + using Container_internal = std::list<ValueType_internal>; /// Variadic template is used to support emplace template <typename... Args> @@ -171,19 +165,17 @@ private: }; /// Owning list typedef - typedef std::list<ValueType_internal> Container; + using Container = std::list<ValueType_internal>; /// Hash table typedef - typedef std::unordered_map<KeyType, Container> HashTableType; + using HashTableType = std::unordered_map<KeyType, Container>; - typedef boost::intrusive::member_hook<ValueType_internal, link_type, - &ValueType_internal::member_hook> - MemberHookOption; + using MemberHookOption = boost::intrusive::member_hook<ValueType_internal, link_type, + &ValueType_internal::member_hook>; /// No constant time size to support self unlink, cache size is tracked by the class - typedef boost::intrusive::list<ValueType_internal, MemberHookOption, - boost::intrusive::constant_time_size<false>> - LruListType; + using LruListType = boost::intrusive::list<ValueType_internal, MemberHookOption, + boost::intrusive::constant_time_size<false>>; void release(ValueType_internal* p_value_internal); void destroy(ValueType_internal* p_value_internal); @@ -203,7 +195,7 @@ private: /// Protects access to cache. No need for read/write cache as there is no costly /// pure read operation - SpinLock _lock; + std::mutex _lock; public: /// RAII Accessor to give unqiue access for a cached object @@ -216,7 +208,8 @@ public: Accessor(Accessor&&); Accessor& operator=(Accessor&&); - DISALLOW_COPY_AND_ASSIGN(Accessor); + Accessor(const Accessor&) = delete; + const Accessor& operator=(const Accessor&) = delete; /// Automatic release in destructor ~Accessor(); diff --git a/be/src/util/lru_multi_cache.inline.h b/be/src/util/lru_multi_cache.inline.h index 5fe85399aac..b201d50dd8e 100644 --- a/be/src/util/lru_multi_cache.inline.h +++ b/be/src/util/lru_multi_cache.inline.h @@ -23,7 +23,7 @@ #include <glog/logging.h> -#include "util/hash_util.hpp" +#include "util/hash_util.hpp" // IWYU pragma: keep #include "util/lru_multi_cache.h" #include "util/time.h" @@ -110,25 +110,25 @@ LruMultiCache<KeyType, ValueType>::LruMultiCache(size_t capacity) : _capacity(ca template <typename KeyType, typename ValueType> size_t LruMultiCache<KeyType, ValueType>::size() { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); return _size; } template <typename KeyType, typename ValueType> size_t LruMultiCache<KeyType, ValueType>::number_of_keys() { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); return _hash_table.size(); } template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::set_capacity(size_t new_capacity) { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); _capacity = new_capacity; } template <typename KeyType, typename ValueType> auto LruMultiCache<KeyType, ValueType>::get(const KeyType& key) -> Accessor { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); auto hash_table_it = _hash_table.find(key); // No owning list found with this key, the caller will have to create a new object @@ -160,7 +160,7 @@ template <typename KeyType, typename ValueType> template <typename... Args> auto LruMultiCache<KeyType, ValueType>::emplace_and_get(const KeyType& key, Args&&... args) -> Accessor { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); // creates default container if there isn't one Container& container = _hash_table[key]; @@ -186,7 +186,7 @@ auto LruMultiCache<KeyType, ValueType>::emplace_and_get(const KeyType& key, Args template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::release(ValueType_internal* p_value_internal) { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); // This only can be used by the accessor, which already checks for nullptr DCHECK(p_value_internal); @@ -213,7 +213,7 @@ void LruMultiCache<KeyType, ValueType>::release(ValueType_internal* p_value_inte template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::destroy(ValueType_internal* p_value_internal) { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); // This only can be used by the accessor, which already checks for nullptr DCHECK(p_value_internal); @@ -236,19 +236,19 @@ void LruMultiCache<KeyType, ValueType>::destroy(ValueType_internal* p_value_inte template <typename KeyType, typename ValueType> size_t LruMultiCache<KeyType, ValueType>::number_of_available_objects() { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); return _lru_list.size(); } template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::rehash() { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); _hash_table.rehash(_hash_table.bucket_count() + 1); } template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::_evict_one(ValueType_internal& value_internal) { - // SpinLock is locked by the caller evicting function + // std::mutex is locked by the caller evicting function // _lock.DCheckLocked(); // Has to be available to evict @@ -272,7 +272,7 @@ void LruMultiCache<KeyType, ValueType>::_evict_one(ValueType_internal& value_int template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::_evict_one_if_needed() { - // SpinLock is locked by the caller public function + // std::mutex is locked by the caller public function // _lock.DCheckLocked(); if (!_lru_list.empty() && _size > _capacity) { @@ -282,7 +282,7 @@ void LruMultiCache<KeyType, ValueType>::_evict_one_if_needed() { template <typename KeyType, typename ValueType> void LruMultiCache<KeyType, ValueType>::evict_older_than(uint64_t oldest_allowed_timestamp) { - std::lock_guard<SpinLock> g(_lock); + std::lock_guard<std::mutex> g(_lock); // Stop eviction if // - there are no more available (i.e. evictable) objects diff --git a/be/src/util/spinlock.h b/be/src/util/spinlock.h deleted file mode 100644 index c0712875fe6..00000000000 --- a/be/src/util/spinlock.h +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/spinlock.h -// and modified by Doris - -#pragma once - -#include <sched.h> /* For sched_yield() */ - -#include <atomic> - -namespace doris { - -// Lightweight spinlock. -class SpinLock { -public: - SpinLock() : _locked(false) { - // do nothing - } - - // Acquires the lock, spins until the lock becomes available - void lock() { - for (int spin_count = 0; !try_lock(); ++spin_count) { - if (spin_count < NUM_SPIN_CYCLES) { -#if (defined(__i386) || defined(__x86_64__)) - asm volatile("pause\n" : : : "memory"); -#elif defined(__aarch64__) - asm volatile("yield\n" ::: "memory"); -#endif - } else { - sched_yield(); - spin_count = 0; - } - } - } - - void unlock() { _locked.clear(std::memory_order_release); } - - // Tries to acquire the lock - bool try_lock() { return !_locked.test_and_set(std::memory_order_acquire); } - -private: - static const int NUM_SPIN_CYCLES = 70; - std::atomic_flag _locked; -}; - -} // end namespace doris diff --git a/be/src/util/trace.h b/be/src/util/trace.h index 7a876fd3c3c..af062ed318c 100644 --- a/be/src/util/trace.h +++ b/be/src/util/trace.h @@ -18,12 +18,7 @@ #include <butil/macros.h> -#include "gutil/ref_counted.h" -#include "gutil/strings/substitute.h" -#include "gutil/threading/thread_collision_warner.h" #include "util/scoped_cleanup.h" -#include "util/spinlock.h" -#include "util/time.h" // If this scope times out, make a simple trace. // It will log the cost time only. diff --git a/be/src/util/uuid_generator.h b/be/src/util/uuid_generator.h index 0a78ca9b8cc..990a62d30e1 100644 --- a/be/src/util/uuid_generator.h +++ b/be/src/util/uuid_generator.h @@ -21,17 +21,13 @@ #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include <mutex> -#include <ostream> -#include <string> - -#include "util/spinlock.h" namespace doris { class UUIDGenerator { public: boost::uuids::uuid next_uuid() { - std::lock_guard<SpinLock> lock(_uuid_gen_lock); + std::lock_guard<std::mutex> lock(_uuid_gen_lock); return _boost_uuid_generator(); } @@ -42,7 +38,7 @@ public: private: boost::uuids::basic_random_generator<boost::mt19937> _boost_uuid_generator; - SpinLock _uuid_gen_lock; + std::mutex _uuid_gen_lock; }; } // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 7f66308839c..d1d8c9d4811 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -167,7 +167,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel* node_channel, const std::s } { - std::lock_guard<doris::SpinLock> l(_fail_lock); + std::lock_guard<std::mutex> l(_fail_lock); if (tablet_id == -1) { for (const auto the_tablet_id : it->second) { _failed_channels[the_tablet_id].insert(node_id); @@ -190,7 +190,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel* node_channel, const std::s } Status IndexChannel::check_intolerable_failure() { - std::lock_guard<doris::SpinLock> l(_fail_lock); + std::lock_guard<std::mutex> l(_fail_lock); return _intolerable_failure_status; } @@ -198,7 +198,7 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { std::vector<TErrorTabletInfo> error_tablet_infos; { - std::lock_guard<doris::SpinLock> l(_fail_lock); + std::lock_guard<std::mutex> l(_fail_lock); for (const auto& it : _failed_channels_msgs) { TErrorTabletInfo error_info; error_info.__set_tabletId(it.first); @@ -522,7 +522,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { if (_cancelled) { - std::lock_guard<doris::SpinLock> l(_cancel_msg_lock); + std::lock_guard<std::mutex> l(_cancel_msg_lock); return Status::Error<ErrorCode::INTERNAL_ERROR, false>("add row failed. {}", _cancel_msg); } else { @@ -621,7 +621,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, void VNodeChannel::_cancel_with_msg(const std::string& msg) { LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg; { - std::lock_guard<doris::SpinLock> l(_cancel_msg_lock); + std::lock_guard<std::mutex> l(_cancel_msg_lock); if (_cancel_msg.empty()) { _cancel_msg = msg; } @@ -946,7 +946,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) { auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { if (_cancelled) { - std::lock_guard<doris::SpinLock> l(_cancel_msg_lock); + std::lock_guard<std::mutex> l(_cancel_msg_lock); return Status::Error<ErrorCode::INTERNAL_ERROR, false>("wait close failed. {}", _cancel_msg); } else { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 22cb633ae65..9f10c01ba8a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -57,7 +57,6 @@ #include "runtime/thread_context.h" #include "util/brpc_closure.h" #include "util/runtime_profile.h" -#include "util/spinlock.h" #include "util/stopwatch.hpp" #include "vec/columns/column.h" #include "vec/core/block.h" @@ -264,7 +263,7 @@ public: bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } std::string get_cancel_msg() { - std::lock_guard<doris::SpinLock> l(_cancel_msg_lock); + std::lock_guard<std::mutex> l(_cancel_msg_lock); if (!_cancel_msg.empty()) { return _cancel_msg; } @@ -342,7 +341,7 @@ protected: // user cancel or get some errors std::atomic<bool> _cancelled {false}; - doris::SpinLock _cancel_msg_lock; + std::mutex _cancel_msg_lock; std::string _cancel_msg; // send finished means the consumer thread which send the rpc can exit @@ -516,7 +515,7 @@ private: bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs - mutable doris::SpinLock _fail_lock; + mutable std::mutex _fail_lock; // key is tablet_id, value is a set of failed node id std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels; // key is tablet_id, value is error message diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index d331bd0d2ac..5223f2bb6c9 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -23,18 +23,17 @@ #include <gtest/gtest-param-test.h> #include <gtest/gtest-test-part.h> #include <sched.h> -#include <stdlib.h> -#include <time.h> #include <unistd.h> #include <atomic> #include <cstdint> +#include <cstdlib> +#include <ctime> #include <functional> #include <iostream> #include <iterator> #include <limits> #include <memory> -#include <mutex> #include <string> #include <thread> #include <utility> @@ -43,13 +42,11 @@ #include "common/logging.h" #include "common/status.h" #include "gtest/gtest.h" -#include "gtest/gtest_pred_impl.h" #include "gutil/strings/substitute.h" #include "util/barrier.h" #include "util/countdown_latch.h" #include "util/random.h" #include "util/scoped_cleanup.h" -#include "util/spinlock.h" #include "util/time.h" using std::atomic; @@ -653,11 +650,11 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) { Random rng(seed); // Protects 'tokens' and 'rng'. - SpinLock lock; + std::mutex lock; // Fetch a token from 'tokens' at random. auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> { - std::lock_guard<SpinLock> l(lock); + std::lock_guard<std::mutex> l(lock); int idx = rng.Uniform(kNumTokens); return tokens[idx]; }; @@ -666,7 +663,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) { for (int i = 0; i < kNumTokens; i++) { ThreadPool::ExecutionMode mode; { - std::lock_guard<SpinLock> l(lock); + std::lock_guard<std::mutex> l(lock); mode = rng.Next() % 2 ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT; } @@ -690,7 +687,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) { int num_tokens_cycled = 0; while (latch.count()) { { - std::lock_guard<SpinLock> l(lock); + std::lock_guard<std::mutex> l(lock); int idx = rng.Uniform(kNumTokens); ThreadPool::ExecutionMode mode = rng.Next() % 2 ? ThreadPool::ExecutionMode::SERIAL diff --git a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp index 16a29887c1d..66b13dd7445 100644 --- a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp @@ -22,16 +22,15 @@ #include <glog/logging.h> #include <gtest/gtest-message.h> #include <gtest/gtest-test-part.h> -#include <math.h> -#include <stdint.h> #include <sys/types.h> #include <algorithm> +#include <cmath> +#include <cstdint> #include <memory> #include <new> #include <ostream> #include <string> -#include <unordered_map> #include <utility> #include <vector> @@ -42,14 +41,12 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" #include "io/fs/local_file_system.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/types.h" #include "util/slice.h" -#include "util/spinlock.h" #include "util/timezone_utils.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" @@ -58,8 +55,6 @@ #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_factory.hpp" -#include "vec/exec/format/parquet/parquet_column_convert.h" #include "vec/exec/format/parquet/parquet_common.h" #include "vec/exec/format/parquet/parquet_thrift_util.h" #include "vec/exec/format/parquet/schema_desc.h" @@ -67,12 +62,11 @@ #include "vec/exec/format/parquet/vparquet_file_metadata.h" #include "vec/exec/format/parquet/vparquet_group_reader.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { class ParquetThriftReaderTest : public testing::Test { public: - ParquetThriftReaderTest() {} + ParquetThriftReaderTest() = default; }; TEST_F(ParquetThriftReaderTest, normal) { @@ -464,6 +458,4 @@ TEST_F(ParquetThriftReaderTest, dict_decoder) { read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/dict-decoder.parquet", "./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12); } -} // namespace vectorized - -} // namespace doris +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org