chaoyli closed pull request #416: Move txn related task to txn manager
URL: https://github.com/apache/incubator-doris/pull/416
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index eb4a42ba..a61b7f19 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -80,6 +80,7 @@ add_library(Olap STATIC
     tablet.cpp
     tablet_meta.cpp
     tablet_meta_manager.cpp
+    txn_manager.cpp
     types.cpp 
     utils.cpp
     wrapper_field.cpp
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 49846b78..de2ad46c 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -117,7 +117,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
         _tablet_stat_cache_update_time_ms(0),
         _snapshot_base_id(0),
         _is_report_disk_state_already(false),
-        _is_report_tablet_already(false) {
+        _is_report_tablet_already(false),
+        _txn_mgr() {
     if (_s_instance == nullptr) {
         _s_instance = this;
     }
@@ -747,7 +748,6 @@ OLAPStatus StorageEngine::clear() {
     SAFE_DELETE(_index_stream_lru_cache);
 
     _tablet_map.clear();
-    _transaction_tablet_map.clear();
     _global_tablet_id = 0;
 
     return OLAP_SUCCESS;
@@ -918,95 +918,36 @@ OLAPStatus StorageEngine::add_tablet(TTabletId tablet_id, 
SchemaHash schema_hash
 OLAPStatus StorageEngine::add_transaction(
     TPartitionId partition_id, TTransactionId transaction_id,
     TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) {
-
-    pair<int64_t, int64_t> key(partition_id, transaction_id);
-    TabletInfo tablet_info(tablet_id, schema_hash);
-    WriteLock wrlock(&_transaction_tablet_map_lock);
-    auto it = _transaction_tablet_map.find(key);
-    if (it != _transaction_tablet_map.end()) {
-        auto load_info = it->second.find(tablet_info);
-        if (load_info != it->second.end()) {
-            for (PUniqueId& pid : load_info->second) {
-                if (pid.hi() == load_id.hi() && pid.lo() == load_id.lo()) {
-                    LOG(WARNING) << "find transaction exists when add to 
engine."
-                        << "partition_id: " << key.first
-                        << ", transaction_id: " << key.second
-                        << ", tablet: " << tablet_info.to_string();
-                    return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST;
-                }
-            }
-        }
-    }
-
-    _transaction_tablet_map[key][tablet_info].push_back(load_id);
-    VLOG(3) << "add transaction to engine successfully."
-            << "partition_id: " << key.first
-            << ", transaction_id: " << key.second
-            << ", tablet: " << tablet_info.to_string();
-    return OLAP_SUCCESS;
+    
+    OLAPStatus status = _txn_mgr.add_txn(partition_id, transaction_id, 
+        tablet_id, schema_hash, load_id);
+    return status;
 }
 
 void StorageEngine::delete_transaction(
     TPartitionId partition_id, TTransactionId transaction_id,
     TTabletId tablet_id, SchemaHash schema_hash, bool delete_from_tablet) {
 
-    pair<int64_t, int64_t> key(partition_id, transaction_id);
-    TabletInfo tablet_info(tablet_id, schema_hash);
-    WriteLock wrlock(&_transaction_tablet_map_lock);
-
-    auto it = _transaction_tablet_map.find(key);
-    if (it != _transaction_tablet_map.end()) {
-        VLOG(3) << "delete transaction to engine successfully."
-                << ",partition_id: " << key.first
-                << ", transaction_id: " << key.second
-                << ", tablet: " << tablet_info.to_string();
-        it->second.erase(tablet_info);
-        if (it->second.empty()) {
-            _transaction_tablet_map.erase(it);
-        }
-
-        // delete transaction from tablet
-        if (delete_from_tablet) {
-            TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id, 
tablet_info.schema_hash);
-            if (tablet.get() != nullptr) {
-                tablet->delete_pending_data(transaction_id);
-            }
+    // call txn manager to delete txn from memory
+    OLAPStatus res = _txn_mgr.delete_txn(partition_id, transaction_id, 
+        tablet_id, schema_hash);
+    // delete transaction from tablet
+    if (res == OLAP_SUCCESS && delete_from_tablet) {
+        TabletSharedPtr tablet = get_tablet(tablet_id, schema_hash);
+        if (tablet.get() != nullptr) {
+            tablet->delete_pending_data(transaction_id);
         }
     }
 }
 
 void StorageEngine::get_transactions_by_tablet(TabletSharedPtr tablet, 
int64_t* partition_id,
                                             set<int64_t>* transaction_ids) {
-    if (tablet.get() == nullptr || partition_id == nullptr || transaction_ids 
== nullptr) {
-        OLAP_LOG_WARNING("parameter is null when get transactions by tablet");
-        return;
-    }
-
-    TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash());
-    ReadLock rdlock(&_transaction_tablet_map_lock);
-    for (auto& it : _transaction_tablet_map) {
-        if (it.second.find(tablet_info) != it.second.end()) {
-            *partition_id = it.first.first;
-            transaction_ids->insert(it.first.second);
-            VLOG(3) << "find transaction on tablet."
-                    << "partition_id: " << it.first.first
-                    << ", transaction_id: " << it.first.second
-                    << ", tablet: " << tablet_info.to_string();
-        }
-    }
+    _txn_mgr.get_tablet_related_txns(tablet, partition_id, transaction_ids);
 }
 
 bool StorageEngine::has_transaction(TPartitionId partition_id, TTransactionId 
transaction_id,
                                  TTabletId tablet_id, SchemaHash schema_hash) {
-    pair<int64_t, int64_t> key(partition_id, transaction_id);
-    TabletInfo tablet_info(tablet_id, schema_hash);
-
-    _transaction_tablet_map_lock.rdlock();
-    auto it = _transaction_tablet_map.find(key);
-    bool found = it != _transaction_tablet_map.end()
-                 && it->second.find(tablet_info) != it->second.end();
-    _transaction_tablet_map_lock.unlock();
-
+    bool found = _txn_mgr.has_txn(partition_id, transaction_id, tablet_id, 
schema_hash);
     return found;
 }
 
@@ -1023,25 +964,14 @@ OLAPStatus StorageEngine::publish_version(const 
TPublishVersionRequest& publish_
          : publish_version_req.partition_version_infos) {
 
         int64_t partition_id = partitionVersionInfo.partition_id;
-        pair<int64_t, int64_t> key(partition_id, transaction_id);
-
-        _transaction_tablet_map_lock.rdlock();
-        auto it = _transaction_tablet_map.find(key);
-        if (it == _transaction_tablet_map.end()) {
-            OLAP_LOG_WARNING("no tablet to publish version. [partition_id=%ld 
transaction_id=%ld]",
-                             partition_id, transaction_id);
-            _transaction_tablet_map_lock.unlock();
-            continue;
-        }
-        std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = 
it->second;
-        _transaction_tablet_map_lock.unlock();
+        vector<TabletInfo> tablet_infos;
+        _txn_mgr.get_txn_related_tablets(transaction_id, partition_id, 
&tablet_infos);
 
         Version version(partitionVersionInfo.version, 
partitionVersionInfo.version);
         VersionHash version_hash = partitionVersionInfo.version_hash;
 
         // each tablet
-        for (auto& load_info : load_info_map) {
-            const TabletInfo& tablet_info = load_info.first;
+        for (auto& tablet_info : tablet_infos) {
             VLOG(3) << "begin to publish version on tablet. "
                     << "tablet_id=" << tablet_info.tablet_id
                     << ", schema_hash=" << tablet_info.schema_hash
@@ -1075,18 +1005,8 @@ OLAPStatus StorageEngine::publish_version(const 
TPublishVersionRequest& publish_
             } else if (publish_status == OLAP_SUCCESS) {
                 LOG(INFO) << "publish version successfully on tablet. tablet=" 
<< tablet->full_name()
                           << ", transaction_id=" << transaction_id << ", 
version=" << version.first;
-                _transaction_tablet_map_lock.wrlock();
-                auto it2 = _transaction_tablet_map.find(key);
-                if (it2 != _transaction_tablet_map.end()) {
-                    VLOG(3) << "delete transaction from engine. tablet=" << 
tablet->full_name()
-                        << "transaction_id: " << transaction_id;
-                    it2->second.erase(tablet_info);
-                    if (it2->second.empty()) {
-                        _transaction_tablet_map.erase(it2);
-                    }
-                }
-                _transaction_tablet_map_lock.unlock();
-
+                _txn_mgr.delete_txn(partition_id, transaction_id, 
+                    tablet_info.tablet_id, tablet_info.schema_hash);
             } else {
                 OLAP_LOG_WARNING("fail to publish version on tablet. "
                                  "[tablet=%s transaction_id=%ld version=%d 
res=%d]",
@@ -1108,31 +1028,16 @@ void StorageEngine::clear_transaction_task(const 
TTransactionId transaction_id,
                                         const vector<TPartitionId> 
partition_ids) {
     LOG(INFO) << "begin to clear transaction task. transaction_id=" <<  
transaction_id;
 
-    // each partition
     for (const TPartitionId& partition_id : partition_ids) {
-
-        // get tablets in this transaction
-        pair<int64_t, int64_t> key(partition_id, transaction_id);
-
-        _transaction_tablet_map_lock.rdlock();
-        auto it = _transaction_tablet_map.find(key);
-        if (it == _transaction_tablet_map.end()) {
-            OLAP_LOG_WARNING("no tablet to clear transaction. 
[partition_id=%ld transaction_id=%ld]",
-                             partition_id, transaction_id);
-            _transaction_tablet_map_lock.unlock();
-            continue;
-        }
-        std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = 
it->second;
-        _transaction_tablet_map_lock.unlock();
+        vector<TabletInfo> tablet_infos;
+        _txn_mgr.get_txn_related_tablets(transaction_id, partition_id, 
&tablet_infos);
 
         // each tablet
-        for (auto& load_info : load_info_map) {
-            const TabletInfo& tablet_info = load_info.first;
+        for (auto& tablet_info : tablet_infos) {
             delete_transaction(partition_id, transaction_id,
-                               tablet_info.tablet_id, tablet_info.schema_hash);
+                                tablet_info.tablet_id, 
tablet_info.schema_hash);
         }
     }
-
     LOG(INFO) << "finish to clear transaction task. transaction_id=" << 
transaction_id;
 }
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 4b4feb7f..38217ed7 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -43,6 +43,7 @@
 #include "olap/tablet.h"
 #include "olap/olap_meta.h"
 #include "olap/options.h"
+#include "olap/txn_manager.h"
 
 namespace doris {
 
@@ -540,9 +541,6 @@ class StorageEngine {
 
     RWMutex _tablet_map_lock;
     tablet_map_t _tablet_map;
-    RWMutex _transaction_tablet_map_lock;
-    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
-    std::map<TxnKey, std::map<TabletInfo, std::vector<PUniqueId>>> 
_transaction_tablet_map;
     size_t _global_tablet_id;
     Cache* _file_descriptor_lru_cache;
     Cache* _index_stream_lru_cache;
@@ -612,6 +610,7 @@ class StorageEngine {
     std::condition_variable _report_cv;
     std::atomic_bool _is_report_disk_state_already;
     std::atomic_bool _is_report_tablet_already;
+    TxnManager _txn_mgr;
 };
 
 }  // namespace doris
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
new file mode 100644
index 00000000..336703bf
--- /dev/null
+++ b/be/src/olap/txn_manager.cpp
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/storage_engine.h"
+
+#include <signal.h>
+
+#include <algorithm>
+#include <cstdio>
+#include <new>
+#include <queue>
+#include <set>
+#include <random>
+
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <boost/filesystem.hpp>
+#include <rapidjson/document.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "agent/file_downloader.h"
+#include "olap/base_compaction.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/lru_cache.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
+#include "olap/push_handler.h"
+#include "olap/reader.h"
+#include "olap/schema_change.h"
+#include "olap/store.h"
+#include "olap/utils.h"
+#include "olap/data_writer.h"
+#include "util/time.h"
+#include "util/doris_metrics.h"
+#include "util/pretty_printer.h"
+
+using apache::thrift::ThriftDebugString;
+using boost::filesystem::canonical;
+using boost::filesystem::directory_iterator;
+using boost::filesystem::path;
+using boost::filesystem::recursive_directory_iterator;
+using std::back_inserter;
+using std::copy;
+using std::inserter;
+using std::list;
+using std::map;
+using std::nothrow;
+using std::pair;
+using std::priority_queue;
+using std::set;
+using std::set_difference;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace doris {
+
+OLAPStatus TxnManager::add_txn(
+    TPartitionId partition_id, TTransactionId transaction_id,
+    TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) {
+
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, schema_hash);
+    WriteLock wrlock(&_transaction_tablet_map_lock);
+    auto it = _transaction_tablet_map.find(key);
+    if (it != _transaction_tablet_map.end()) {
+        auto load_info = it->second.find(tablet_info);
+        if (load_info != it->second.end()) {
+            for (PUniqueId& pid : load_info->second) {
+                if (pid.hi() == load_id.hi() && pid.lo() == load_id.lo()) {
+                    LOG(WARNING) << "find transaction exists when add to 
engine."
+                        << "partition_id: " << key.first
+                        << ", transaction_id: " << key.second
+                        << ", tablet: " << tablet_info.to_string();
+                    return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST;
+                }
+            }
+        }
+    }
+
+    _transaction_tablet_map[key][tablet_info].push_back(load_id);
+    VLOG(3) << "add transaction to engine successfully."
+            << "partition_id: " << key.first
+            << ", transaction_id: " << key.second
+            << ", tablet: " << tablet_info.to_string();
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus TxnManager::delete_txn(
+    TPartitionId partition_id, TTransactionId transaction_id,
+    TTabletId tablet_id, SchemaHash schema_hash) {
+
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, schema_hash);
+    WriteLock wrlock(&_transaction_tablet_map_lock);
+
+    auto it = _transaction_tablet_map.find(key);
+    if (it != _transaction_tablet_map.end()) {
+        VLOG(3) << "delete transaction to engine successfully."
+                << ",partition_id: " << key.first
+                << ", transaction_id: " << key.second
+                << ", tablet: " << tablet_info.to_string();
+        it->second.erase(tablet_info);
+        if (it->second.empty()) {
+            _transaction_tablet_map.erase(it);
+        }
+
+        // delete transaction from tablet
+        // delete from tablet is useless and it should be called at 
storageengine
+        /*
+        if (delete_from_tablet) {
+            TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id, 
tablet_info.schema_hash);
+            if (tablet.get() != nullptr) {
+                tablet->delete_pending_data(transaction_id);
+            }
+        }
+        */
+       return OLAP_SUCCESS;
+    } else {
+        return OLAP_ERR_TRANSACTION_NOT_EXIST;
+    }
+}
+
+void TxnManager::get_tablet_related_txns(TabletSharedPtr tablet, int64_t* 
partition_id,
+                                            std::set<int64_t>* 
transaction_ids) {
+    if (tablet.get() == nullptr || partition_id == nullptr || transaction_ids 
== nullptr) {
+        OLAP_LOG_WARNING("parameter is null when get transactions by tablet");
+        return;
+    }
+
+    TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash());
+    ReadLock rdlock(&_transaction_tablet_map_lock);
+    for (auto& it : _transaction_tablet_map) {
+        if (it.second.find(tablet_info) != it.second.end()) {
+            *partition_id = it.first.first;
+            transaction_ids->insert(it.first.second);
+            VLOG(3) << "find transaction on tablet."
+                    << "partition_id: " << it.first.first
+                    << ", transaction_id: " << it.first.second
+                    << ", tablet: " << tablet_info.to_string();
+        }
+    }
+}
+
+void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
+                                        TPartitionId partition_id,
+                                        std::vector<TabletInfo>* tablet_infos) 
{
+    // get tablets in this transaction
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+
+    _transaction_tablet_map_lock.rdlock();
+    auto it = _transaction_tablet_map.find(key);
+    if (it == _transaction_tablet_map.end()) {
+        OLAP_LOG_WARNING("could not find tablet for [partition_id=%ld 
transaction_id=%ld]",
+                            partition_id, transaction_id);
+        _transaction_tablet_map_lock.unlock();
+        return;
+    }
+    std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = it->second;
+    _transaction_tablet_map_lock.unlock();
+
+    // each tablet
+    for (auto& load_info : load_info_map) {
+        const TabletInfo& tablet_info = load_info.first;
+        tablet_infos->push_back(tablet_info);
+    }
+
+}
+                                
+
+bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
+                                 TTabletId tablet_id, SchemaHash schema_hash) {
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, schema_hash);
+
+    _transaction_tablet_map_lock.rdlock();
+    auto it = _transaction_tablet_map.find(key);
+    bool found = it != _transaction_tablet_map.end()
+                 && it->second.find(tablet_info) != it->second.end();
+    _transaction_tablet_map_lock.unlock();
+
+    return found;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
new file mode 100644
index 00000000..d65d4d3a
--- /dev/null
+++ b/be/src/olap/txn_manager.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_TXN_MANAGER_H
+#define DORIS_BE_SRC_OLAP_TXN_MANAGER_H
+
+#include <ctime>
+#include <list>
+#include <map>
+#include <mutex>
+#include <condition_variable>
+#include <set>
+#include <string>
+#include <vector>
+#include <thread>
+
+#include <rapidjson/document.h>
+#include <pthread.h>
+
+#include "agent/status.h"
+#include "common/status.h"
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/BackendService_types.h"
+#include "gen_cpp/MasterService_types.h"
+#include "olap/atomic.h"
+#include "olap/lru_cache.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/tablet.h"
+#include "olap/olap_meta.h"
+#include "olap/options.h"
+
+namespace doris {
+
+// txn manager is used to manage mapping between tablet and txns
+class TxnManager {
+public:
+    TxnManager() {}
+    ~TxnManager() {
+        _transaction_tablet_map.clear();
+    }
+    // add a txn to manager
+    // partition id is useful in publish version stage because version is 
associated with partition
+    OLAPStatus add_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
+                               TTabletId tablet_id, SchemaHash schema_hash,
+                               const PUniqueId& load_id);
+
+    OLAPStatus delete_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
+                            TTabletId tablet_id, SchemaHash schema_hash);
+
+    void get_tablet_related_txns(TabletSharedPtr tablet, int64_t* partition_id,
+                                    std::set<int64_t>* transaction_ids);
+
+    void get_txn_related_tablets(const TTransactionId transaction_id,
+                                TPartitionId partition_ids,
+                                std::vector<TabletInfo>* tablet_infos);
+
+    bool has_txn(TPartitionId partition_id, TTransactionId transaction_id,
+                         TTabletId tablet_id, SchemaHash schema_hash);
+
+private:
+    RWMutex _transaction_tablet_map_lock;
+    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
+    std::map<TxnKey, std::map<TabletInfo, std::vector<PUniqueId>>> 
_transaction_tablet_map;
+};  // TxnManager
+
+}
+#endif // DORIS_BE_SRC_OLAP_TXN_MANAGER_H
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to