chaoyli closed pull request #416: Move txn related task to txn manager URL: https://github.com/apache/incubator-doris/pull/416
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index eb4a42ba..a61b7f19 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -80,6 +80,7 @@ add_library(Olap STATIC tablet.cpp tablet_meta.cpp tablet_meta_manager.cpp + txn_manager.cpp types.cpp utils.cpp wrapper_field.cpp diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 49846b78..de2ad46c 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -117,7 +117,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _tablet_stat_cache_update_time_ms(0), _snapshot_base_id(0), _is_report_disk_state_already(false), - _is_report_tablet_already(false) { + _is_report_tablet_already(false), + _txn_mgr() { if (_s_instance == nullptr) { _s_instance = this; } @@ -747,7 +748,6 @@ OLAPStatus StorageEngine::clear() { SAFE_DELETE(_index_stream_lru_cache); _tablet_map.clear(); - _transaction_tablet_map.clear(); _global_tablet_id = 0; return OLAP_SUCCESS; @@ -918,95 +918,36 @@ OLAPStatus StorageEngine::add_tablet(TTabletId tablet_id, SchemaHash schema_hash OLAPStatus StorageEngine::add_transaction( TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) { - - pair<int64_t, int64_t> key(partition_id, transaction_id); - TabletInfo tablet_info(tablet_id, schema_hash); - WriteLock wrlock(&_transaction_tablet_map_lock); - auto it = _transaction_tablet_map.find(key); - if (it != _transaction_tablet_map.end()) { - auto load_info = it->second.find(tablet_info); - if (load_info != it->second.end()) { - for (PUniqueId& pid : load_info->second) { - if (pid.hi() == load_id.hi() && pid.lo() == load_id.lo()) { - LOG(WARNING) << "find transaction exists when add to engine." - << "partition_id: " << key.first - << ", transaction_id: " << key.second - << ", tablet: " << tablet_info.to_string(); - return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST; - } - } - } - } - - _transaction_tablet_map[key][tablet_info].push_back(load_id); - VLOG(3) << "add transaction to engine successfully." - << "partition_id: " << key.first - << ", transaction_id: " << key.second - << ", tablet: " << tablet_info.to_string(); - return OLAP_SUCCESS; + + OLAPStatus status = _txn_mgr.add_txn(partition_id, transaction_id, + tablet_id, schema_hash, load_id); + return status; } void StorageEngine::delete_transaction( TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, bool delete_from_tablet) { - pair<int64_t, int64_t> key(partition_id, transaction_id); - TabletInfo tablet_info(tablet_id, schema_hash); - WriteLock wrlock(&_transaction_tablet_map_lock); - - auto it = _transaction_tablet_map.find(key); - if (it != _transaction_tablet_map.end()) { - VLOG(3) << "delete transaction to engine successfully." - << ",partition_id: " << key.first - << ", transaction_id: " << key.second - << ", tablet: " << tablet_info.to_string(); - it->second.erase(tablet_info); - if (it->second.empty()) { - _transaction_tablet_map.erase(it); - } - - // delete transaction from tablet - if (delete_from_tablet) { - TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id, tablet_info.schema_hash); - if (tablet.get() != nullptr) { - tablet->delete_pending_data(transaction_id); - } + // call txn manager to delete txn from memory + OLAPStatus res = _txn_mgr.delete_txn(partition_id, transaction_id, + tablet_id, schema_hash); + // delete transaction from tablet + if (res == OLAP_SUCCESS && delete_from_tablet) { + TabletSharedPtr tablet = get_tablet(tablet_id, schema_hash); + if (tablet.get() != nullptr) { + tablet->delete_pending_data(transaction_id); } } } void StorageEngine::get_transactions_by_tablet(TabletSharedPtr tablet, int64_t* partition_id, set<int64_t>* transaction_ids) { - if (tablet.get() == nullptr || partition_id == nullptr || transaction_ids == nullptr) { - OLAP_LOG_WARNING("parameter is null when get transactions by tablet"); - return; - } - - TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash()); - ReadLock rdlock(&_transaction_tablet_map_lock); - for (auto& it : _transaction_tablet_map) { - if (it.second.find(tablet_info) != it.second.end()) { - *partition_id = it.first.first; - transaction_ids->insert(it.first.second); - VLOG(3) << "find transaction on tablet." - << "partition_id: " << it.first.first - << ", transaction_id: " << it.first.second - << ", tablet: " << tablet_info.to_string(); - } - } + _txn_mgr.get_tablet_related_txns(tablet, partition_id, transaction_ids); } bool StorageEngine::has_transaction(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash) { - pair<int64_t, int64_t> key(partition_id, transaction_id); - TabletInfo tablet_info(tablet_id, schema_hash); - - _transaction_tablet_map_lock.rdlock(); - auto it = _transaction_tablet_map.find(key); - bool found = it != _transaction_tablet_map.end() - && it->second.find(tablet_info) != it->second.end(); - _transaction_tablet_map_lock.unlock(); - + bool found = _txn_mgr.has_txn(partition_id, transaction_id, tablet_id, schema_hash); return found; } @@ -1023,25 +964,14 @@ OLAPStatus StorageEngine::publish_version(const TPublishVersionRequest& publish_ : publish_version_req.partition_version_infos) { int64_t partition_id = partitionVersionInfo.partition_id; - pair<int64_t, int64_t> key(partition_id, transaction_id); - - _transaction_tablet_map_lock.rdlock(); - auto it = _transaction_tablet_map.find(key); - if (it == _transaction_tablet_map.end()) { - OLAP_LOG_WARNING("no tablet to publish version. [partition_id=%ld transaction_id=%ld]", - partition_id, transaction_id); - _transaction_tablet_map_lock.unlock(); - continue; - } - std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = it->second; - _transaction_tablet_map_lock.unlock(); + vector<TabletInfo> tablet_infos; + _txn_mgr.get_txn_related_tablets(transaction_id, partition_id, &tablet_infos); Version version(partitionVersionInfo.version, partitionVersionInfo.version); VersionHash version_hash = partitionVersionInfo.version_hash; // each tablet - for (auto& load_info : load_info_map) { - const TabletInfo& tablet_info = load_info.first; + for (auto& tablet_info : tablet_infos) { VLOG(3) << "begin to publish version on tablet. " << "tablet_id=" << tablet_info.tablet_id << ", schema_hash=" << tablet_info.schema_hash @@ -1075,18 +1005,8 @@ OLAPStatus StorageEngine::publish_version(const TPublishVersionRequest& publish_ } else if (publish_status == OLAP_SUCCESS) { LOG(INFO) << "publish version successfully on tablet. tablet=" << tablet->full_name() << ", transaction_id=" << transaction_id << ", version=" << version.first; - _transaction_tablet_map_lock.wrlock(); - auto it2 = _transaction_tablet_map.find(key); - if (it2 != _transaction_tablet_map.end()) { - VLOG(3) << "delete transaction from engine. tablet=" << tablet->full_name() - << "transaction_id: " << transaction_id; - it2->second.erase(tablet_info); - if (it2->second.empty()) { - _transaction_tablet_map.erase(it2); - } - } - _transaction_tablet_map_lock.unlock(); - + _txn_mgr.delete_txn(partition_id, transaction_id, + tablet_info.tablet_id, tablet_info.schema_hash); } else { OLAP_LOG_WARNING("fail to publish version on tablet. " "[tablet=%s transaction_id=%ld version=%d res=%d]", @@ -1108,31 +1028,16 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, const vector<TPartitionId> partition_ids) { LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id; - // each partition for (const TPartitionId& partition_id : partition_ids) { - - // get tablets in this transaction - pair<int64_t, int64_t> key(partition_id, transaction_id); - - _transaction_tablet_map_lock.rdlock(); - auto it = _transaction_tablet_map.find(key); - if (it == _transaction_tablet_map.end()) { - OLAP_LOG_WARNING("no tablet to clear transaction. [partition_id=%ld transaction_id=%ld]", - partition_id, transaction_id); - _transaction_tablet_map_lock.unlock(); - continue; - } - std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = it->second; - _transaction_tablet_map_lock.unlock(); + vector<TabletInfo> tablet_infos; + _txn_mgr.get_txn_related_tablets(transaction_id, partition_id, &tablet_infos); // each tablet - for (auto& load_info : load_info_map) { - const TabletInfo& tablet_info = load_info.first; + for (auto& tablet_info : tablet_infos) { delete_transaction(partition_id, transaction_id, - tablet_info.tablet_id, tablet_info.schema_hash); + tablet_info.tablet_id, tablet_info.schema_hash); } } - LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 4b4feb7f..38217ed7 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -43,6 +43,7 @@ #include "olap/tablet.h" #include "olap/olap_meta.h" #include "olap/options.h" +#include "olap/txn_manager.h" namespace doris { @@ -540,9 +541,6 @@ class StorageEngine { RWMutex _tablet_map_lock; tablet_map_t _tablet_map; - RWMutex _transaction_tablet_map_lock; - using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; - std::map<TxnKey, std::map<TabletInfo, std::vector<PUniqueId>>> _transaction_tablet_map; size_t _global_tablet_id; Cache* _file_descriptor_lru_cache; Cache* _index_stream_lru_cache; @@ -612,6 +610,7 @@ class StorageEngine { std::condition_variable _report_cv; std::atomic_bool _is_report_disk_state_already; std::atomic_bool _is_report_tablet_already; + TxnManager _txn_mgr; }; } // namespace doris diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp new file mode 100644 index 00000000..336703bf --- /dev/null +++ b/be/src/olap/txn_manager.cpp @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/storage_engine.h" + +#include <signal.h> + +#include <algorithm> +#include <cstdio> +#include <new> +#include <queue> +#include <set> +#include <random> + +#include <boost/algorithm/string/classification.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/split.hpp> +#include <boost/filesystem.hpp> +#include <rapidjson/document.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "agent/file_downloader.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" +#include "olap/lru_cache.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_meta_manager.h" +#include "olap/push_handler.h" +#include "olap/reader.h" +#include "olap/schema_change.h" +#include "olap/store.h" +#include "olap/utils.h" +#include "olap/data_writer.h" +#include "util/time.h" +#include "util/doris_metrics.h" +#include "util/pretty_printer.h" + +using apache::thrift::ThriftDebugString; +using boost::filesystem::canonical; +using boost::filesystem::directory_iterator; +using boost::filesystem::path; +using boost::filesystem::recursive_directory_iterator; +using std::back_inserter; +using std::copy; +using std::inserter; +using std::list; +using std::map; +using std::nothrow; +using std::pair; +using std::priority_queue; +using std::set; +using std::set_difference; +using std::string; +using std::stringstream; +using std::vector; + +namespace doris { + +OLAPStatus TxnManager::add_txn( + TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) { + + pair<int64_t, int64_t> key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, schema_hash); + WriteLock wrlock(&_transaction_tablet_map_lock); + auto it = _transaction_tablet_map.find(key); + if (it != _transaction_tablet_map.end()) { + auto load_info = it->second.find(tablet_info); + if (load_info != it->second.end()) { + for (PUniqueId& pid : load_info->second) { + if (pid.hi() == load_id.hi() && pid.lo() == load_id.lo()) { + LOG(WARNING) << "find transaction exists when add to engine." + << "partition_id: " << key.first + << ", transaction_id: " << key.second + << ", tablet: " << tablet_info.to_string(); + return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST; + } + } + } + } + + _transaction_tablet_map[key][tablet_info].push_back(load_id); + VLOG(3) << "add transaction to engine successfully." + << "partition_id: " << key.first + << ", transaction_id: " << key.second + << ", tablet: " << tablet_info.to_string(); + return OLAP_SUCCESS; +} + +OLAPStatus TxnManager::delete_txn( + TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash) { + + pair<int64_t, int64_t> key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, schema_hash); + WriteLock wrlock(&_transaction_tablet_map_lock); + + auto it = _transaction_tablet_map.find(key); + if (it != _transaction_tablet_map.end()) { + VLOG(3) << "delete transaction to engine successfully." + << ",partition_id: " << key.first + << ", transaction_id: " << key.second + << ", tablet: " << tablet_info.to_string(); + it->second.erase(tablet_info); + if (it->second.empty()) { + _transaction_tablet_map.erase(it); + } + + // delete transaction from tablet + // delete from tablet is useless and it should be called at storageengine + /* + if (delete_from_tablet) { + TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id, tablet_info.schema_hash); + if (tablet.get() != nullptr) { + tablet->delete_pending_data(transaction_id); + } + } + */ + return OLAP_SUCCESS; + } else { + return OLAP_ERR_TRANSACTION_NOT_EXIST; + } +} + +void TxnManager::get_tablet_related_txns(TabletSharedPtr tablet, int64_t* partition_id, + std::set<int64_t>* transaction_ids) { + if (tablet.get() == nullptr || partition_id == nullptr || transaction_ids == nullptr) { + OLAP_LOG_WARNING("parameter is null when get transactions by tablet"); + return; + } + + TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash()); + ReadLock rdlock(&_transaction_tablet_map_lock); + for (auto& it : _transaction_tablet_map) { + if (it.second.find(tablet_info) != it.second.end()) { + *partition_id = it.first.first; + transaction_ids->insert(it.first.second); + VLOG(3) << "find transaction on tablet." + << "partition_id: " << it.first.first + << ", transaction_id: " << it.first.second + << ", tablet: " << tablet_info.to_string(); + } + } +} + +void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, + TPartitionId partition_id, + std::vector<TabletInfo>* tablet_infos) { + // get tablets in this transaction + pair<int64_t, int64_t> key(partition_id, transaction_id); + + _transaction_tablet_map_lock.rdlock(); + auto it = _transaction_tablet_map.find(key); + if (it == _transaction_tablet_map.end()) { + OLAP_LOG_WARNING("could not find tablet for [partition_id=%ld transaction_id=%ld]", + partition_id, transaction_id); + _transaction_tablet_map_lock.unlock(); + return; + } + std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = it->second; + _transaction_tablet_map_lock.unlock(); + + // each tablet + for (auto& load_info : load_info_map) { + const TabletInfo& tablet_info = load_info.first; + tablet_infos->push_back(tablet_info); + } + +} + + +bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash) { + pair<int64_t, int64_t> key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, schema_hash); + + _transaction_tablet_map_lock.rdlock(); + auto it = _transaction_tablet_map.find(key); + bool found = it != _transaction_tablet_map.end() + && it->second.find(tablet_info) != it->second.end(); + _transaction_tablet_map_lock.unlock(); + + return found; +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h new file mode 100644 index 00000000..d65d4d3a --- /dev/null +++ b/be/src/olap/txn_manager.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_OLAP_TXN_MANAGER_H +#define DORIS_BE_SRC_OLAP_TXN_MANAGER_H + +#include <ctime> +#include <list> +#include <map> +#include <mutex> +#include <condition_variable> +#include <set> +#include <string> +#include <vector> +#include <thread> + +#include <rapidjson/document.h> +#include <pthread.h> + +#include "agent/status.h" +#include "common/status.h" +#include "gen_cpp/AgentService_types.h" +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/MasterService_types.h" +#include "olap/atomic.h" +#include "olap/lru_cache.h" +#include "olap/olap_common.h" +#include "olap/olap_define.h" +#include "olap/tablet.h" +#include "olap/olap_meta.h" +#include "olap/options.h" + +namespace doris { + +// txn manager is used to manage mapping between tablet and txns +class TxnManager { +public: + TxnManager() {} + ~TxnManager() { + _transaction_tablet_map.clear(); + } + // add a txn to manager + // partition id is useful in publish version stage because version is associated with partition + OLAPStatus add_txn(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash, + const PUniqueId& load_id); + + OLAPStatus delete_txn(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash); + + void get_tablet_related_txns(TabletSharedPtr tablet, int64_t* partition_id, + std::set<int64_t>* transaction_ids); + + void get_txn_related_tablets(const TTransactionId transaction_id, + TPartitionId partition_ids, + std::vector<TabletInfo>* tablet_infos); + + bool has_txn(TPartitionId partition_id, TTransactionId transaction_id, + TTabletId tablet_id, SchemaHash schema_hash); + +private: + RWMutex _transaction_tablet_map_lock; + using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; + std::map<TxnKey, std::map<TabletInfo, std::vector<PUniqueId>>> _transaction_tablet_map; +}; // TxnManager + +} +#endif // DORIS_BE_SRC_OLAP_TXN_MANAGER_H \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org