This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3235b636cc [refactor](remove unused code) remove thread pool manager (#16179) 3235b636cc is described below commit 3235b636cc7caff5e3ad126c36a4f12748cd4426 Author: yiguolei <676222...@qq.com> AuthorDate: Sun Jan 29 13:03:08 2023 +0800 [refactor](remove unused code) remove thread pool manager (#16179) * remove thread resource manager * remove string buffer --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable.h | 4 - be/src/runtime/CMakeLists.txt | 1 - be/src/runtime/exec_env.h | 3 - be/src/runtime/exec_env_init.cpp | 3 - be/src/runtime/runtime_state.h | 6 - be/src/runtime/string_buffer.hpp | 108 --------- be/src/runtime/thread_resource_mgr.cpp | 107 --------- be/src/runtime/thread_resource_mgr.h | 247 --------------------- be/src/util/CMakeLists.txt | 1 - be/src/util/tuple_row_zorder_compare.cpp | 28 --- be/src/util/tuple_row_zorder_compare.h | 34 --- be/test/CMakeLists.txt | 2 - be/test/http/stream_load_test.cpp | 4 - be/test/runtime/external_scan_context_mgr_test.cpp | 4 - be/test/runtime/string_buffer_test.cpp | 74 ------ be/test/runtime/test_env.cc | 2 - be/test/runtime/thread_resource_mgr_test.cpp | 66 ------ be/test/vec/exec/vtablet_sink_test.cpp | 3 - 18 files changed, 697 deletions(-) diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 80bcb26c0c..0cd504ca3f 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -24,7 +24,6 @@ #include "olap/skiplist.h" #include "olap/tablet.h" #include "runtime/memory/mem_tracker.h" -#include "util/tuple_row_zorder_compare.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -117,9 +116,6 @@ private: Schema* _schema; const TabletSchema* _tablet_schema; - // TODO: change to unique_ptr of comparator - std::shared_ptr<RowComparator> _row_comparator; - std::shared_ptr<RowInBlockComparator> _vec_row_comparator; // `_insert_manual_mem_tracker` manually records the memory value of memtable insert() diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 7dc25bda7a..ada78fc9d0 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -42,7 +42,6 @@ set(RUNTIME_FILES runtime_predicate.cpp jsonb_value.cpp thread_context.cpp - thread_resource_mgr.cpp threadlocal.cc decimalv2_value.cpp large_int_value.cpp diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f90cb98ba6..c6123d7fbe 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -58,7 +58,6 @@ class ResultBufferMgr; class ResultQueueMgr; class TMasterInfo; class LoadChannelMgr; -class ThreadResourceMgr; class TmpFileMgr; class WebPageHandler; class StreamLoadExecutor; @@ -124,7 +123,6 @@ public: std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } - ThreadResourceMgr* thread_mgr() { return _thread_mgr; } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } void set_serial_download_cache_thread_token() { @@ -204,7 +202,6 @@ private: ClientCache<BackendServiceClient>* _backend_client_cache = nullptr; ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr; ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr; - ThreadResourceMgr* _thread_mgr = nullptr; // The default tracker consumed by mem hook. If the thread does not attach other trackers, // by default all consumption will be passed to the process tracker through the orphan tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0764082e08..ef88739bf4 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -46,7 +46,6 @@ #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" -#include "runtime/thread_resource_mgr.h" #include "runtime/tmp_file_mgr.h" #include "util/bfd_parser.h" #include "util/brpc_client_cache.h" @@ -95,7 +94,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); - _thread_mgr = new ThreadResourceMgr(); ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(config::send_batch_thread_pool_thread_num) @@ -313,7 +311,6 @@ void ExecEnv::_destroy() { SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_pipeline_task_scheduler); SAFE_DELETE(_cgroups_mgr); - SAFE_DELETE(_thread_mgr); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); SAFE_DELETE(_backend_client_cache); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 83dae14d50..30608333ae 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -28,7 +28,6 @@ #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "gen_cpp/Types_types.h" // for TUniqueId #include "runtime/query_fragments_ctx.h" -#include "runtime/thread_resource_mgr.h" #include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" @@ -105,7 +104,6 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } ExecEnv* exec_env() { return _exec_env; } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; } - ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; } void set_fragment_root_id(PlanNodeId id) { DCHECK(_root_node_id == -1) << "Should not set this twice."; @@ -468,10 +466,6 @@ private: TQueryOptions _query_options; ExecEnv* _exec_env = nullptr; - // Thread resource management object for this fragment's execution. The runtime - // state is responsible for returning this pool to the thread mgr. - ThreadResourceMgr::ResourcePool* _resource_pool; - // if true, execution should stop with a CANCELLED status std::atomic<bool> _is_cancelled; diff --git a/be/src/runtime/string_buffer.hpp b/be/src/runtime/string_buffer.hpp deleted file mode 100644 index e54473d4e9..0000000000 --- a/be/src/runtime/string_buffer.hpp +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "runtime/mem_pool.h" -#include "vec/common/string_ref.h" - -namespace doris { - -// Dynamic-sizable string (similar to std::string) but without as many -// copies and allocations. -// StringBuffer wraps a StringRef object with a pool and memory buffer length. -// It supports a subset of the std::string functionality but will only allocate -// bigger string buffers as necessary. std::string tries to be immutable and will -// reallocate very often. std::string should be avoided in all hot paths. -class StringBuffer { -public: - // C'tor for StringBuffer. Memory backing the string will be allocated from - // the pool as necessary. Can optionally be initialized from a StringRef. - StringBuffer(MemPool* pool, StringRef* str) : _pool(pool), _buffer_size(0) { - if (str != NULL) { - _string_value = *str; - _buffer_size = str->size; - } - } - - StringBuffer(MemPool* pool) : _pool(pool), _buffer_size(0) {} - - virtual ~StringBuffer() {} - - // append 'str' to the current string, allocating a new buffer as necessary. - void append(const char* str, int len) { - int new_len = len + _string_value.size; - - if (new_len > _buffer_size) { - grow_buffer(new_len); - } - - memcpy(const_cast<char*>(_string_value.data) + _string_value.size, str, len); - _string_value.size = new_len; - } - - // TODO: switch everything to uint8_t? - void append(const uint8_t* str, int len) { append(reinterpret_cast<const char*>(str), len); } - - // Assigns contents to StringBuffer - void assign(const char* str, int len) { - clear(); - append(str, len); - } - - // clear the underlying StringRef. The allocated buffer can be reused. - void clear() { _string_value.size = 0; } - - // Clears the underlying buffer and StringRef - void reset() { - _string_value.size = 0; - _buffer_size = 0; - } - - // Returns whether the current string is empty - bool empty() const { return _string_value.size == 0; } - - // Returns the length of the current string - int size() const { return _string_value.size; } - - // Returns the underlying StringRef - const StringRef& str() const { return _string_value; } - - // Returns the buffer size - int buffer_size() const { return _buffer_size; } - -private: - // Grows the buffer backing the string to be at least new_size, copying - // over the previous string data into the new buffer. - // TODO: some kind of doubling strategy? - void grow_buffer(int new_len) { - char* new_buffer = reinterpret_cast<char*>(_pool->allocate(new_len)); - - if (_string_value.size > 0) { - memcpy(new_buffer, _string_value.data, _string_value.size); - } - - _string_value.data = new_buffer; - _buffer_size = new_len; - } - - MemPool* _pool; - StringRef _string_value; - int _buffer_size; -}; - -} // namespace doris diff --git a/be/src/runtime/thread_resource_mgr.cpp b/be/src/runtime/thread_resource_mgr.cpp deleted file mode 100644 index fcf9e47068..0000000000 --- a/be/src/runtime/thread_resource_mgr.cpp +++ /dev/null @@ -1,107 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/thread_resource_mgr.h" - -#include <vector> - -#include "common/config.h" -#include "common/logging.h" -#include "util/cpu_info.h" - -namespace doris { - -ThreadResourceMgr::ThreadResourceMgr(int threads_quota) { - DCHECK_GE(threads_quota, 0); - - if (threads_quota == 0) { - _system_threads_quota = CpuInfo::num_cores() * config::num_threads_per_core; - } else { - _system_threads_quota = threads_quota; - } - - _per_pool_quota = 0; -} - -ThreadResourceMgr::ThreadResourceMgr() { - _system_threads_quota = CpuInfo::num_cores() * config::num_threads_per_core; - _per_pool_quota = 0; -} - -ThreadResourceMgr::~ThreadResourceMgr() { - for (auto pool : _free_pool_objs) { - delete pool; - } - for (auto pool : _pools) { - delete pool; - } -} - -ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent) : _parent(parent) {} - -void ThreadResourceMgr::ResourcePool::reset() { - _num_threads = 0; - _num_reserved_optional_threads = 0; - _max_quota = INT_MAX; -} - -void ThreadResourceMgr::ResourcePool::reserve_optional_tokens(int num) { - DCHECK_GE(num, 0); - _num_reserved_optional_threads = num; -} - -ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() { - std::unique_lock<std::mutex> l(_lock); - ResourcePool* pool = nullptr; - - if (_free_pool_objs.empty()) { - pool = new ResourcePool(this); - } else { - pool = _free_pool_objs.front(); - _free_pool_objs.pop_front(); - } - - DCHECK(pool != nullptr); - DCHECK(_pools.find(pool) == _pools.end()); - _pools.insert(pool); - pool->reset(); - - // Added a new pool, update the quotas for each pool. - update_pool_quotas(); - return pool; -} - -void ThreadResourceMgr::unregister_pool(ResourcePool* pool) { - DCHECK(pool != nullptr); - std::unique_lock<std::mutex> l(_lock); - // this may be double unregistered after pr #3326 by LaiYingChun, so check if the pool is already unregisted - if (_pools.find(pool) != _pools.end()) { - _pools.erase(pool); - _free_pool_objs.push_back(pool); - update_pool_quotas(); - } -} - -void ThreadResourceMgr::update_pool_quotas() { - if (_pools.empty()) { - return; - } - - _per_pool_quota = ceil(static_cast<double>(_system_threads_quota) / _pools.size()); -} - -} // namespace doris diff --git a/be/src/runtime/thread_resource_mgr.h b/be/src/runtime/thread_resource_mgr.h deleted file mode 100644 index b48a49ecaf..0000000000 --- a/be/src/runtime/thread_resource_mgr.h +++ /dev/null @@ -1,247 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <stdlib.h> - -#include <functional> -#include <list> -#include <mutex> -#include <thread> - -#include "common/status.h" - -namespace doris { - -// Singleton object to manage CPU (aka thread) resources for the process. -// Conceptually, there is a fixed pool of threads that are shared between -// query fragments. If there is only one fragment running, it can use the -// entire pool, spinning up the maximum number of threads to saturate the -// hardware. If there are multiple fragments, the CPU pool must be shared -// between them. Currently, the total system pool is split evenly between -// all consumers. Each consumer gets ceil(total_system_threads / num_consumers). -// -// Each fragment must register with the ThreadResourceMgr to request threads -// (in the form of tokens). The fragment has required threads (it can't run -// with fewer threads) and optional threads. If the fragment is running on its -// own, it will be able to spin up more optional threads. When the system -// is under load, the ThreadResourceMgr will stop giving out tokens for optional -// threads. -// Pools should not use this for threads that are almost always idle (e.g. -// periodic reporting threads). -// Pools will temporarily go over the quota regularly and this is very -// much by design. For example, if a pool is running on its own with -// 4 required threads and 28 optional and another pool is added to the -// system, the first pool's quota is then cut by half (16 total) and will -// over time drop the optional threads. -// This class is thread safe. -// TODO: this is an initial simple version to improve the behavior with -// concurrency. This will need to be expanded post GA. These include: -// - More places where threads are optional (e.g. hash table build side, -// data stream threads, etc). -// - Admission control -// - Integration with other nodes/statestore -// - Priorities for different pools -// If both the mgr and pool locks need to be taken, the mgr lock must -// be taken first. -class ThreadResourceMgr { -public: - class ResourcePool; - - // This function will be called whenever the pool has more threads it can run on. - // This can happen on ReleaseThreadToken or if the quota for this pool increases. - // This is a good place, for example, to wake up anything blocked on available threads. - // This callback must not block. - // Note that this is not called once for each available thread or even guaranteed that - // when it is called, a thread is available (the quota could have changed again in - // between). It is simply that something might have happened (similar to condition - // variable semantics). - // TODO: this is manageable now since it just needs to call into the io - // mgr. What's the best model for something more general. - typedef std::function<void(ResourcePool*)> thread_available_cb; - - // Pool abstraction for a single resource pool. - // TODO: this is not quite sufficient going forward. We need a hierarchy of pools, - // one for the entire query, and a sub pool for each component that needs threads, - // all of which share a quota. Currently, the way state is tracked here, it would - // be impossible to have two components both want optional threads (e.g. two things - // that have 1+ thread usage). - class ResourcePool { - public: - virtual ~ResourcePool() {}; - // Acquire a thread for the pool. This will always succeed; the - // pool will go over the quota. - // Pools should use this API to reserve threads they need in order - // to make progress. - void acquire_thread_token(); - - // Try to acquire a thread for this pool. If the pool is at - // the quota, this will return false and the pool should not run. - // Pools should use this API for resources they can use but don't - // need (e.g. scanner threads). - bool try_acquire_thread_token(); - - // Set a reserved optional number of threads for this pool. This can be - // used to implement that a component needs n+ number of threads. The - // first 'num' threads are guaranteed to be acquirable (via try_acquire_thread_token) - // but anything beyond can fail. - // This can also be done with: - // if (pool->num_optional_threads() < num) acquire_thread_token(); - // else try_acquire_thread_token(); - // and similar tracking on the Release side but this is common enough to - // abstract it away. - void reserve_optional_tokens(int num); - - // Release a thread for the pool. This must be called once for - // each call to acquire_thread_token and each successful call to try_acquire_thread_token - // If the thread token is from acquire_thread_token, required must be true; false - // if from try_acquire_thread_token. - // Must not be called from from thread_available_cb. - void release_thread_token(bool required); - - // Returns the number of threads that are from acquire_thread_token. - int num_required_threads() const { return _num_threads & 0xFFFFFFFF; } - - // Returns the number of thread resources returned by successful calls - // to try_acquire_thread_token. - int num_optional_threads() const { return _num_threads >> 32; } - - // Returns the total number of thread resources for this pool - // (i.e. num_optional_threads + num_required_threads). - int64_t num_threads() const { return num_required_threads() + num_optional_threads(); } - - // Returns the number of optional threads that can still be used. - int num_available_threads() const { - int value = std::max(quota() - static_cast<int>(num_threads()), - _num_reserved_optional_threads - num_optional_threads()); - return std::max(0, value); - } - - // Returns the quota for this pool. Note this changes dynamically - // based on system load. - int quota() const { return std::min(_max_quota, _parent->_per_pool_quota); } - - // Sets the max thread quota for this pool. This is only used for testing since - // the dynamic values should be used normally. The actual quota is the min of this - // value and the dynamic quota. - void set_max_quota(int quota) { _max_quota = quota; } - - private: - friend class ThreadResourceMgr; - - ResourcePool(ThreadResourceMgr* parent); - - // Resets internal state. - void reset(); - - ThreadResourceMgr* _parent; - - int _max_quota; - int _num_reserved_optional_threads; - - // A single 64 bit value to store both the number of optional and - // required threads. This is combined to allow using compare and - // swap operations. The number of required threads is the lower - // 32 bits and the number of optional threads is the upper 32 bits. - int64_t _num_threads; - }; - - // Create a thread mgr object. If threads_quota is non-zero, it will be - // the number of threads for the system, otherwise it will be determined - // based on the hardware. - ThreadResourceMgr(int threads_quota); - ThreadResourceMgr(); - ~ThreadResourceMgr(); - - int system_threads_quota() const { return _system_threads_quota; } - - // Register a new pool with the thread mgr. Registering a pool - // will update the quotas for all existing pools. - ResourcePool* register_pool(); - - // Unregisters the pool. 'pool' is no longer valid after this. - // This updates the quotas for the remaining pools. - void unregister_pool(ResourcePool* pool); - -private: - // 'Optimal' number of threads for the entire process. - int _system_threads_quota; - - // Lock for the entire object. Protects all fields below. - std::mutex _lock; - - // Pools currently being managed - typedef std::set<ResourcePool*> Pools; - Pools _pools; - - // Each pool currently gets the same share. This is the ceil of the - // system quota divided by the number of pools. - int _per_pool_quota; - - // Recycled list of pool objects - std::list<ResourcePool*> _free_pool_objs; - - void update_pool_quotas(); -}; - -inline void ThreadResourceMgr::ResourcePool::acquire_thread_token() { - __sync_fetch_and_add(&_num_threads, 1); -} - -inline bool ThreadResourceMgr::ResourcePool::try_acquire_thread_token() { - while (true) { - int64_t previous_num_threads = _num_threads; - int64_t new_optional_threads = (previous_num_threads >> 32) + 1; - int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF; - - if (new_optional_threads > _num_reserved_optional_threads && - new_optional_threads + new_required_threads > quota()) { - return false; - } - - int64_t new_value = new_optional_threads << 32 | new_required_threads; - - // Atomically swap the new value if no one updated _num_threads. We do not - // not care about the ABA problem here. - if (__sync_bool_compare_and_swap(&_num_threads, previous_num_threads, new_value)) { - return true; - } - } -} - -inline void ThreadResourceMgr::ResourcePool::release_thread_token(bool required) { - if (required) { - DCHECK_GT(num_required_threads(), 0); - __sync_fetch_and_add(&_num_threads, -1); - } else { - DCHECK_GT(num_optional_threads(), 0); - - while (true) { - int64_t previous_num_threads = _num_threads; - int64_t new_optional_threads = (previous_num_threads >> 32) - 1; - int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF; - int64_t new_value = new_optional_threads << 32 | new_required_threads; - - if (__sync_bool_compare_and_swap(&_num_threads, previous_num_threads, new_value)) { - break; - } - } - } -} - -} // namespace doris diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 85ffc54135..33ca9a5e49 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -104,7 +104,6 @@ set(UTIL_FILES hdfs_util.cpp time_lut.cpp cityhash102/city.cc - tuple_row_zorder_compare.cpp telemetry/telemetry.cpp telemetry/brpc_carrier.cpp telemetry/open_telemetry_scop_wrapper.hpp diff --git a/be/src/util/tuple_row_zorder_compare.cpp b/be/src/util/tuple_row_zorder_compare.cpp deleted file mode 100644 index 25bb270eeb..0000000000 --- a/be/src/util/tuple_row_zorder_compare.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/tuple_row_zorder_compare.h" - -namespace doris { - -RowComparator::RowComparator(Schema* schema) {} - -int RowComparator::operator()(const char* left, const char* right) const { - return -1; -} - -} // namespace doris diff --git a/be/src/util/tuple_row_zorder_compare.h b/be/src/util/tuple_row_zorder_compare.h deleted file mode 100644 index 39775a3527..0000000000 --- a/be/src/util/tuple_row_zorder_compare.h +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "olap/row_cursor.h" -#include "olap/schema.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/tuple.h" - -namespace doris { -class RowComparator { -public: - RowComparator() = default; - RowComparator(Schema* schema); - virtual ~RowComparator() = default; - virtual int operator()(const char* left, const char* right) const; -}; -} // namespace doris diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index e1134269b0..f11962297e 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -139,10 +139,8 @@ set(RUNTIME_TEST_FILES # runtime/datetime_value_test.cpp # runtime/dpp_sink_test.cpp # runtime/tmp_file_mgr_test.cpp - # runtime/thread_resource_mgr_test.cpp # runtime/export_task_mgr_test.cpp runtime/mem_pool_test.cpp - runtime/string_buffer_test.cpp runtime/decimalv2_value_test.cpp runtime/large_int_value_test.cpp runtime/string_value_test.cpp diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 587bbfbd20..1609eedf80 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -29,7 +29,6 @@ #include "runtime/exec_env.h" #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" -#include "runtime/thread_resource_mgr.h" #include "util/brpc_client_cache.h" #include "util/cpu_info.h" @@ -71,7 +70,6 @@ public: k_response_str = ""; config::streaming_load_max_mb = 1; - _env._thread_mgr = new ThreadResourceMgr(); _env._master_info = new TMasterInfo(); _env._load_stream_mgr = new LoadStreamMgr(); _env._internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); @@ -89,8 +87,6 @@ public: _env._load_stream_mgr = nullptr; delete _env._master_info; _env._master_info = nullptr; - delete _env._thread_mgr; - _env._thread_mgr = nullptr; delete _env._stream_load_executor; _env._stream_load_executor = nullptr; diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index a9d1660f55..29a045234b 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -25,7 +25,6 @@ #include "common/status.h" #include "runtime/fragment_mgr.h" #include "runtime/result_queue_mgr.h" -#include "runtime/thread_resource_mgr.h" namespace doris { @@ -33,15 +32,12 @@ class ExternalScanContextMgrTest : public testing::Test { public: ExternalScanContextMgrTest() { FragmentMgr* fragment_mgr = new FragmentMgr(&_exec_env); - ThreadResourceMgr* thread_mgr = new ThreadResourceMgr(); ResultQueueMgr* result_queue_mgr = new ResultQueueMgr(); _exec_env._fragment_mgr = fragment_mgr; - _exec_env._thread_mgr = thread_mgr; _exec_env._result_queue_mgr = result_queue_mgr; } virtual ~ExternalScanContextMgrTest() { delete _exec_env._fragment_mgr; - delete _exec_env._thread_mgr; delete _exec_env._result_queue_mgr; } diff --git a/be/test/runtime/string_buffer_test.cpp b/be/test/runtime/string_buffer_test.cpp deleted file mode 100644 index 2e8d7a1b33..0000000000 --- a/be/test/runtime/string_buffer_test.cpp +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/string_buffer.hpp" - -#include <gtest/gtest.h> - -#include <string> - -#include "runtime/mem_pool.h" - -namespace doris { - -void validate_string(const std::string& std_str, const StringBuffer& str) { - EXPECT_EQ(std_str.empty(), str.empty()); - EXPECT_EQ((int)std_str.size(), str.size()); - - if (std_str.size() > 0) { - EXPECT_EQ(strncmp(std_str.c_str(), str.str().data, std_str.size()), 0); - } -} - -TEST(StringBufferTest, Basic) { - MemPool pool; - StringBuffer str(&pool); - std::string std_str; - - // Empty string - validate_string(std_str, str); - - // Clear empty string - std_str.clear(); - str.clear(); - validate_string(std_str, str); - - // Append to empty - std_str.append("Hello"); - str.append("Hello", strlen("Hello")); - validate_string(std_str, str); - - // Append some more - std_str.append("World"); - str.append("World", strlen("World")); - validate_string(std_str, str); - - // Assign - std_str.assign("foo"); - str.assign("foo", strlen("foo")); - validate_string(std_str, str); - - // Clear - std_str.clear(); - str.clear(); - validate_string(std_str, str); - - // Underlying buffer size should be the length of the max string during the test. - EXPECT_EQ(str.buffer_size(), strlen("HelloWorld")); -} - -} // namespace doris diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc index f33b6b5238..4551dfb330 100644 --- a/be/test/runtime/test_env.cc +++ b/be/test/runtime/test_env.cc @@ -32,7 +32,6 @@ namespace doris { TestEnv::TestEnv() { // Some code will use ExecEnv::GetInstance(), so init the global ExecEnv singleton _exec_env = ExecEnv::GetInstance(); - _exec_env->_thread_mgr = new ThreadResourceMgr(2); _exec_env->_result_queue_mgr = new ResultQueueMgr(); // TODO may need rpc support, etc. } @@ -49,7 +48,6 @@ void TestEnv::init_tmp_file_mgr(const std::vector<std::string>& tmp_dirs, bool o TestEnv::~TestEnv() { SAFE_DELETE(_exec_env->_result_queue_mgr); - SAFE_DELETE(_exec_env->_thread_mgr); if (_engine == StorageEngine::_s_instance) { // the engine instance is created by this test env diff --git a/be/test/runtime/thread_resource_mgr_test.cpp b/be/test/runtime/thread_resource_mgr_test.cpp deleted file mode 100644 index 21951a8105..0000000000 --- a/be/test/runtime/thread_resource_mgr_test.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/thread_resource_mgr.h" - -#include <gtest/gtest.h> - -#include <functional> -#include <string> - -#include "util/cpu_info.h" - -namespace doris { - -TEST(ThreadResourceMgr, BasicTest) { - ThreadResourceMgr mgr(5); - - ThreadResourceMgr::ResourcePool* c1 = mgr.register_pool(); - c1->acquire_thread_token(); - c1->acquire_thread_token(); - c1->acquire_thread_token(); - EXPECT_EQ(c1->num_threads(), 3); - EXPECT_EQ(c1->num_required_threads(), 3); - EXPECT_EQ(c1->num_optional_threads(), 0); - c1->release_thread_token(true); - EXPECT_EQ(c1->num_threads(), 2); - EXPECT_EQ(c1->num_required_threads(), 2); - EXPECT_EQ(c1->num_optional_threads(), 0); - EXPECT_TRUE(c1->try_acquire_thread_token()); - EXPECT_TRUE(c1->try_acquire_thread_token()); - EXPECT_TRUE(c1->try_acquire_thread_token()); - EXPECT_FALSE(c1->try_acquire_thread_token()); - EXPECT_EQ(c1->num_threads(), 5); - EXPECT_EQ(c1->num_required_threads(), 2); - EXPECT_EQ(c1->num_optional_threads(), 3); - c1->release_thread_token(true); - c1->release_thread_token(false); - - // Register a new consumer, quota is cut in half - ThreadResourceMgr::ResourcePool* c2 = mgr.register_pool(); - EXPECT_FALSE(c1->try_acquire_thread_token()); - EXPECT_EQ(c1->num_threads(), 3); - c1->acquire_thread_token(); - EXPECT_EQ(c1->num_threads(), 4); - EXPECT_EQ(c1->num_required_threads(), 2); - EXPECT_EQ(c1->num_optional_threads(), 2); - - mgr.unregister_pool(c1); - mgr.unregister_pool(c2); -} - -} // namespace doris diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 52706f57fa..05fca396dc 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -31,7 +31,6 @@ #include "runtime/result_queue_mgr.h" #include "runtime/runtime_state.h" #include "runtime/stream_load/load_stream_mgr.h" -#include "runtime/thread_resource_mgr.h" #include "runtime/types.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" @@ -344,7 +343,6 @@ public: void SetUp() override { k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); - _env->_thread_mgr = new ThreadResourceMgr(); _env->_master_info = new TMasterInfo(); _env->_load_stream_mgr = new LoadStreamMgr(); _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); @@ -363,7 +361,6 @@ public: SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_load_stream_mgr); SAFE_DELETE(_env->_master_info); - SAFE_DELETE(_env->_thread_mgr); if (_server) { _server->Stop(100); _server->Join(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org