yiguolei commented on a change in pull request #454: Refactor Tablet and 
TabletMeta
URL: https://github.com/apache/incubator-doris/pull/454#discussion_r243476579
 
 

 ##########
 File path: be/src/olap/tablet.cpp
 ##########
 @@ -202,917 +63,145 @@ Tablet::~Tablet() {
     }
 
     // ensure that there is nobody using Tablet, like acquiring 
OLAPData(SegmentGroup)
-    obtain_header_wrlock();
-    for (auto& it : _data_sources) {
-        for (SegmentGroup* segment_group : it.second) {
-            SAFE_DELETE(segment_group);
-        }
-    }
-    _data_sources.clear();
-
-    // clear the transactions in memory
-    for (auto& it : _pending_data_sources) {
-        // false means can't remove the transaction from tablet_meta, also 
prevent the loading of tablet
-        for (SegmentGroup* segment_group : it.second) {
-            StorageEngine::get_instance()->delete_transaction(
-                    segment_group->partition_id(), 
segment_group->transaction_id(),
-                    _tablet_id, _schema_hash, false);
-            SAFE_DELETE(segment_group);
-        }
-    }
-    _pending_data_sources.clear();
-    release_header_lock();
-
-    SAFE_DELETE(_tablet_meta);
-
-    // 移动数据目录
-    if (_is_dropped) {
-        LOG(INFO) << "drop tablet:" << full_name() << ", tablet path:" << 
_tablet_path;
-        path table_path(_tablet_path);
-        std::string tablet_meta_path = _tablet_path + "/" + 
std::to_string(_tablet_id) + ".hdr";
-        OLAPStatus s = TabletMetaManager::dump_header(_store, _tablet_id, 
_schema_hash, tablet_meta_path);
-        LOG(INFO) << "dump tablet_meta to path:" << tablet_meta_path << ", 
status:" << s;
-        LOG(INFO) << "start to remove tablet tablet_meta:" << full_name();
-        s = TabletMetaManager::remove(_store, _tablet_id, _schema_hash);
-        LOG(INFO) << "finish remove tablet tablet_meta:" << full_name() << ", 
res:" << s;
-        if (move_to_trash(table_path, table_path) != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to delete tablet. [table_path=" << 
_tablet_path << "]";
-        }
-        LOG(INFO) << "finish drop tablet:" << full_name();
+    WriteLock wrlock(_meta_lock);
+    for (auto& it : _version_rowset_map) {
+        SAFE_DELETE(it.second);
     }
+    _version_rowset_map.clear();
 }
 
-OLAPStatus Tablet::load() {
+OLAPStatus Tablet::init_once() {
     OLAPStatus res = OLAP_SUCCESS;
-    MutexLock l(&_load_lock);
-
-    string one_schema_root = _tablet_path;
-    set<string> files;
-    set<string> index_files;
-    set<string> data_files;
-
-    if (_is_loaded) {
-        goto EXIT;
+    ReadLock rdlock(&_meta_lock);
+    for (int ser = 0; ser < _tablet_meta.rowset_size(); ++ser) {
+        const RowsetMeta* rs_meta = _tablet_meta.get_rs_meta(ser);
+        Version version = rs_meta->version();
+        Rowset* rowset = new Rowset(rs_meta);
+        _version_rowset_map[version] = rowset;
+        rowset->init();
     }
-
-    res = dir_walk(one_schema_root, NULL, &files);
-    // Disk Failure will triggered delete file in disk.
-    // IOError will drop object. File only deleted upon restart.
-    // TODO. Tablet should has a state to report to FE, delete tablet
-    //         request will get from FE.      
-    if (res == OLAP_ERR_DISK_FAILURE) {
-        LOG(WARNING) << "fail to walk schema root dir." 
-                     << "res=" << res << ", root=" << one_schema_root;
-        goto EXIT;
-    } else if (res != OLAP_SUCCESS) {
-        StorageEngine::get_instance()->drop_tablet(tablet_id(), schema_hash(), 
true);
-        return res;
-    }
-    res = load_indices();
-
-    if (res != OLAP_SUCCESS) {
-        LOG(FATAL) << "fail to load indices. [res=" << res << " tablet='" << 
_full_name << "']";
-        goto EXIT;
-    }
-
-    // delete unused files
-    obtain_header_rdlock();
-    list_index_files(&index_files);
-    list_data_files(&data_files);
-    if (remove_unused_files(one_schema_root,
-                            files,
-                            "",
-                            index_files,
-                            data_files) != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to remove unused files. [root='" << 
one_schema_root << "']";
-    }
-    release_header_lock();
-
-    _is_loaded = true;
-
-EXIT:
-    if (res != OLAP_SUCCESS) {
-        StorageEngine::get_instance()->drop_tablet(tablet_id(), schema_hash());
-    }
-
-    return res;
+    return OLAP_SUCCESS;
 }
 
 bool Tablet::can_do_compaction() {
     // 如果table正在做schema change,则通过选路判断数据是否转换完成
     // 如果选路成功,则转换完成,可以进行BE
     // 如果选路失败,则转换未完成,不能进行BE
-    this->obtain_header_rdlock();
-    const PDelta* lastest_version = this->lastest_version();
-    if (lastest_version == NULL) {
-        this->release_header_lock();
+    ReadLock rdlock(_meta_lock);
+    const Rowset* lastest_rowset = lastest_version();
+    if (lastest_rowset == NULL) {
+        return false;
+    }
+
+    Version test_version = Version(0, lastest_version->end_version());
+    vector<Version> path_versions;
+    if (OLAP_SUCCESS != _rs_graph->capture_consistent_versions(test_version, 
&path_versions)) {
+        LOG(WARNING) << "tablet has missed version. tablet=" << 
table->full_name();
         return false;
     }
 
-    if (this->is_schema_changing()) {
+    if (table->is_schema_changing()) {
         Version test_version = Version(0, lastest_version->end_version());
         vector<Version> path_versions;
-        if (OLAP_SUCCESS != this->select_versions_to_span(test_version, 
&path_versions)) {
-            this->release_header_lock();
+        if (OLAP_SUCCESS != 
_rs_graph->capture_consistent_versions(test_version, &path_versions)) {
             return false;
         }
     }
-    this->release_header_lock();
 
     return true;
 }
 
-OLAPStatus Tablet::load_indices() {
-    OLAPStatus res = OLAP_SUCCESS;
-    ReadLock rdlock(&_header_lock);
-    TabletMeta* tablet_meta = _tablet_meta;
-    VLOG(3) << "begin to load indices. tablet=" << full_name() << ", "
-        << "version_size=" << tablet_meta->file_delta_size();
-
-    for (int delta_id = 0; delta_id < tablet_meta->delta_size(); ++delta_id) {
-        const PDelta& delta = tablet_meta->delta(delta_id);
-        Version version;
-        version.first = delta.start_version();
-        version.second = delta.end_version();
-        for (int j = 0; j < delta.segment_group_size(); ++j) {
-            const PSegmentGroup& psegment_group = delta.segment_group(j);
-            SegmentGroup* segment_group = new SegmentGroup(this, version, 
delta.version_hash(),
-                                        false, 
psegment_group.segment_group_id(), psegment_group.num_segments());
-            if (segment_group == nullptr) {
-                LOG(WARNING) << "fail to create olap segment_group. 
[version='" << version.first
-                    << "-" << version.second << "' tablet='" << full_name() << 
"']";
-                return OLAP_ERR_MALLOC_ERROR;
-            }
-
-            if (psegment_group.has_empty()) {
-                segment_group->set_empty(psegment_group.empty());
-            }
-            // 在校验和加载索引前把segment_group放到data-source,以防止加载索引失败造成内存泄露
-            _data_sources[version].push_back(segment_group);
-
-            // 判断segment_group是否正常, 在所有版本的都检查完成之后才加载所有版本的segment_group
-            if (segment_group->validate() != OLAP_SUCCESS) {
-                OLAP_LOG_WARNING("fail to validate segment_group. 
[version='%d-%d' version_hash=%ld]",
-                                 version.first,
-                                 version.second,
-                                 tablet_meta->delta(delta_id).version_hash());
-                // 现在只要一个segment_group没有被正确加载,整个table加载失败
-                return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
-            }
-
-            if (psegment_group.column_pruning_size() != 0) {
-                size_t column_pruning_size = 
psegment_group.column_pruning_size();
-                if (_num_key_fields != column_pruning_size) {
-                    LOG(ERROR) << "column pruning size is error."
-                        << "column_pruning_size=" << column_pruning_size << ", 
"
-                        << "num_key_fields=" << _num_key_fields;
-                    return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
-                }
-                std::vector<std::pair<std::string, std::string> > \
-                    column_statistic_strings(_num_key_fields);
-                std::vector<bool> null_vec(_num_key_fields);
-                for (size_t j = 0; j < _num_key_fields; ++j) {
-                    ColumnPruning column_pruning = 
psegment_group.column_pruning(j);
-                    column_statistic_strings[j].first = column_pruning.min();
-                    column_statistic_strings[j].second = column_pruning.max();
-                    if (column_pruning.has_null_flag()) {
-                        null_vec[j] = column_pruning.null_flag();
-                    } else {
-                        null_vec[j] = false;
-                    }
-                }
-                
RETURN_NOT_OK(segment_group->add_column_statistics(column_statistic_strings, 
null_vec));
-            }
-        }
+NewStatus Tablet::capture_consistent_versions(
+                        const Version& version, vector<Version>* 
span_versions) const {
+    NewStatus status = _rs_graph->capture_consistent_versions(version, 
span_versions);
+    if (!status.ok()) {
+        LOG(WARNING) << "fail to generate shortest version path. tablet=" << 
full_name()
+                     << ", version='" << version.first << "-" << 
version.second;
     }
-
-    for (version_olap_index_map_t::const_iterator it = _data_sources.begin();
-            it != _data_sources.end(); ++it) {
-        Version version = it->first;
-        for (SegmentGroup* segment_group : it->second) {
-            if ((res = segment_group->load()) != OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to load segment_group. version=" << 
version.first << "-" << version.second << ", "
-                             << "version_hash=" << 
segment_group->version_hash();
-                // 现在只要一个segment_group没有被正确加载,整个table加载失败
-                return res;
-            }
-
-            VLOG(3) << "load SegmentGroup success. tablet=" << full_name() << 
", "
-                    << "version=" << version.first << "-" << version.second << 
", "
-                    << "version_hash=" << segment_group->version_hash() << ", "
-                    << "num_segments=" << segment_group->num_segments();
-        }
-    }
-
-    return OLAP_SUCCESS;
+    return status;
 }
 
-OLAPStatus Tablet::save_tablet_meta() {
-    OLAPStatus res = TabletMetaManager::save(_store, _tablet_id, _schema_hash, 
_tablet_meta);
-    if (res != OLAP_SUCCESS) {
-       LOG(WARNING) << "fail to save tablet_meta. [res=" << res << " root=" << 
_storage_root_path << "]";
-    }
-
-    return res;
-}
-
-OLAPStatus Tablet::select_versions_to_span( const Version& version,
-                                           vector<Version>* span_versions) 
const {
-    OLAPStatus res = _tablet_meta->select_versions_to_span(version, 
span_versions);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to generate shortest version path. [version='" 
<< version.first
-                     << "-" << version.second << "' tablet='" << full_name() 
<< "']";
-    }
-    return res;
-}
-
-void Tablet::acquire_data_sources(const Version& version, vector<ColumnData*>* 
sources) const {
-    vector<Version> span_versions;
+NewStatus Tablet::capture_consistent_rowsets(const Version& spec_version,
+                                             vector<Rowset*>* rs_readers) {
+    vector<Version> version_path;
+    _rs_graph->capture_consistent_versions(spec_version, version_graph);
 
-    if (_tablet_meta->select_versions_to_span(version, &span_versions) != 
OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to generate shortest version path. [version='" 
<< version.first
-                     << "-" << version.second << "' tablet='" << full_name() 
<< "']";
-        return;
-    }
-
-    acquire_data_sources_by_versions(span_versions, sources);
-    return;
+    acquire_rs_reader_by_version(version_graph, rs_readers);
+    return Status::OK();
 }
 
-void Tablet::acquire_data_sources_by_versions(const vector<Version>& 
version_list,
-                                                 vector<ColumnData*>* sources) 
const {
-    if (sources == NULL) {
-        LOG(WARNING) << "output parameter for data sources is null. tablet=" 
<< full_name();
-        return;
-    }
-
-    // first clear the output vector, please do not put any OLAPData
-    // into this vector, it may be cause memory leak.
-    sources->clear();
-
-    for (vector<Version>::const_iterator it1 = version_list.begin();
-            it1 != version_list.end(); ++it1) {
-        version_olap_index_map_t::const_iterator it2 = 
_data_sources.find(*it1);
-        if (it2 == _data_sources.end()) {
-            LOG(WARNING) << "fail to find SegmentGroup for version. 
[version='" << it1->first
-                         << "-" << it1->second << "' tablet='" << full_name() 
<< "']";
-            release_data_sources(sources);
+void Tablet::acquire_rs_reader_by_version(const vector<Version>& version_vec,
+                                          vector<RowsetReader*>* rs_readers) 
const {
+    DCHECK(rs_readers != NULL && rs_readers.empty());
+    for (auto version : version_vec) {
+        auto it2 = _rs_version_map.find(*it1);
+        if (it2 == _rs_version_map.end()) {
+            LOG(WARNING) << "fail to find Rowset for version. tablet=" << 
full_name()
+                         << ", version='" << version.first << "-" << 
version.second;
+            release_rs_readers(rs_readers);
             return;
         }
 
-        for (SegmentGroup* segment_group : it2->second) {
-            ColumnData* olap_data = ColumnData::create(segment_group);
-            if (olap_data == NULL) {
-                LOG(WARNING) << "fail to malloc Data. [version='" << it1->first
-                    << "-" << it1->second << "' tablet='" << full_name() << 
"']";
-                release_data_sources(sources);
-                return;
-            }
-
-            sources->push_back(olap_data);
-
-            if (olap_data->init() != OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to initial olap data. [version='" << 
it1->first
-                    << "-" << it1->second << "' tablet='" << full_name() << 
"']";
-                release_data_sources(sources);
-                return;
-            }
+        RowsetReader* rs_reader = RowsetReader::create(*it2);
+        Status status = rs_reader->init();
+        if (!status.ok()) {
+            LOG(WARNING) << "fail to init rowset_reader. tablet=" << 
full_name()
+                         << ", version=" << version.first << "-" << 
version.second;
+            release_rs_readers(rs_readers);
         }
+        rs_reader.push_back(rs_reader):
     }
 }
 
-OLAPStatus Tablet::release_data_sources(vector<ColumnData*>* data_sources) 
const {
-    if (data_sources == NULL) {
-        LOG(WARNING) << "parameter data_sources is null. [tablet='" << 
full_name() << "']";
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    for (auto data : *data_sources) {
+NewStatus Tablet::release_rs_readers(vector<RowsetReader*>* rs_readers) const {
+    DCHECK(rs_readers != nullptr) << "rs_readers is null. tablet=" << 
full_name();
+    for (auto data : *rs_readers) {
         delete data;
     }
 
-    // clear data_sources vector
-    data_sources->clear();
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus Tablet::register_data_source(const std::vector<SegmentGroup*>& 
index_vec) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    if (index_vec.empty()) {
-        LOG(WARNING) << "parameter segment_group is null."
-                     << "tablet=" << full_name();
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    for (SegmentGroup* segment_group : index_vec) {
-        Version version = segment_group->version();
-        const std::vector<KeyRange>* column_statistics = nullptr;
-        if (segment_group->has_column_statistics()) {
-            column_statistics = &segment_group->get_column_statistics();
-        }
-        res = _tablet_meta->add_version(version, 
segment_group->version_hash(), segment_group->segment_group_id(),
-                                   segment_group->num_segments(), 
segment_group->index_size(), segment_group->data_size(),
-                                   segment_group->num_rows(), 
segment_group->empty(), column_statistics);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to add version to olap tablet_meta. tablet=" 
<< full_name() << ", "
-                         << "version=" << version.first << "-" << 
version.second;
-            return res;
-        }
-
-        // put the new segment_group into _data_sources.
-        // 由于对tablet_meta的操作可能失败,因此对_data_sources要放在这里
-        _data_sources[version].push_back(segment_group);
-        VLOG(3) << "succeed to register data source. tablet=" << full_name() 
<< ", "
-                << "version=" << version.first << "-" << version.second << ", "
-                << "version_hash=" << segment_group->version_hash() << ", "
-                << "segment_group_id=" << segment_group->segment_group_id() << 
", "
-                << "num_segments=" << segment_group->num_segments();
-    }
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus Tablet::unregister_data_source(const Version& version, 
std::vector<SegmentGroup*>* segment_group_vec) {
-    OLAPStatus res = OLAP_SUCCESS;
-    version_olap_index_map_t::iterator it = _data_sources.find(version);
-    if (it == _data_sources.end()) {
-        LOG(WARNING) << "olap segment_group for version does not exists. 
[version='" << version.first
-                     << "-" << version.second << "' tablet='" << full_name() 
<< "']";
-        return OLAP_ERR_VERSION_NOT_EXIST;
-    }
-
-    // delete a reference to the data source in the tablet_meta file
-    if ((res = _tablet_meta->delete_version(version)) != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to delete version from olap tablet_meta. 
[version='" << version.first
-                     << "-" << version.second << "' tablet='" << full_name() 
<< "']";
-        return res;
-    }
-
-    *segment_group_vec = it->second;
-    _data_sources.erase(it);
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus Tablet::add_pending_version(int64_t partition_id, int64_t 
transaction_id,
-                                        const std::vector<string>* 
delete_conditions) {
-   WriteLock wrlock(&_header_lock);
-   OLAPStatus res = _tablet_meta->add_pending_version(partition_id, 
transaction_id, delete_conditions);
-   if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to add pending delta to tablet_meta."
-                     << "tablet=" << full_name() << ", "
-                     << "transaction_id=" << transaction_id;
-        return res;
-   }
-   res = save_tablet_meta();
-   if (res != OLAP_SUCCESS) {
-       _tablet_meta->delete_pending_delta(transaction_id);
-       LOG(FATAL) << "fail to save tablet_meta when add pending segment_group. 
[tablet=" << full_name()
-           << " transaction_id=" << transaction_id << "]";
-       return res;
-   }
-   return OLAP_SUCCESS;
-}
-
-OLAPStatus Tablet::add_pending_segment_group(SegmentGroup* segment_group) {
-    if (segment_group == nullptr) {
-        LOG(WARNING) << "parameter segment_group is null. [tablet=" << 
full_name() << "]";
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    int64_t transaction_id = segment_group->transaction_id();
-    obtain_header_wrlock();
-    OLAPStatus res = OLAP_SUCCESS;
-
-    // add to tablet_meta
-    const std::vector<KeyRange>* column_statistics = nullptr;
-    if (segment_group->has_column_statistics()) {
-        column_statistics = &(segment_group->get_column_statistics());
-    }
-    res = _tablet_meta->add_pending_segment_group(transaction_id, 
segment_group->num_segments(),
-                                      segment_group->segment_group_id(), 
segment_group->load_id(),
-                                      segment_group->empty(), 
column_statistics);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to add pending segment_group to tablet_meta. 
[tablet=" << full_name()
-                     << " transaction_id=" << transaction_id << "]";
-        release_header_lock();
-        return res;
-    }
-
-    // save tablet_meta
-    res = save_tablet_meta();
-    if (res != OLAP_SUCCESS) {
-        _tablet_meta->delete_pending_delta(transaction_id);
-        LOG(FATAL) << "fail to save tablet_meta when add pending 
segment_group. [tablet=" << full_name()
-                   << " transaction_id=" << transaction_id << "]";
-        release_header_lock();
-        return res;
-    }
-
-    // add to data sources
-    _pending_data_sources[transaction_id].push_back(segment_group);
-    release_header_lock();
-    VLOG(3) << "add pending data to tablet successfully."
-            << "tablet=" << full_name() << ", transaction_id=" << 
transaction_id;
-
-    return res;
-}
-
-int32_t Tablet::current_pending_segment_group_id(int64_t transaction_id) {
-    ReadLock rdlock(&_header_lock);
-    int32_t segment_group_id = -1;
-    if (_pending_data_sources.find(transaction_id) != 
_pending_data_sources.end()) {
-        for (SegmentGroup* segment_group : 
_pending_data_sources[transaction_id]) {
-            if (segment_group->segment_group_id() > segment_group_id) {
-                segment_group_id = segment_group->segment_group_id();
-            }
-        }
-    }
-    return segment_group_id;
-}
-
-OLAPStatus Tablet::add_pending_data(SegmentGroup* segment_group, const 
std::vector<TCondition>* delete_conditions) {
-    if (segment_group == nullptr) {
-        LOG(WARNING) << "parameter segment_group is null. tablet=" << 
full_name(); 
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    obtain_header_wrlock();
-    int64_t transaction_id = segment_group->transaction_id();
-    if (_pending_data_sources.find(transaction_id) != 
_pending_data_sources.end()) {
-        LOG(WARNING) << "find pending data existed when add to tablet. 
[tablet=" << full_name()
-                     << " transaction_id=" << transaction_id << "]";
-        release_header_lock();
-        return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST;
-    }
-    OLAPStatus res = OLAP_SUCCESS;
-
-    // if push for delete, construct sub conditions
-    vector<string> condition_strs;
-    if (delete_conditions != nullptr) {
-        DeleteConditionHandler del_cond_handler;
-        for (const TCondition& condition : *delete_conditions) {
-            
condition_strs.push_back(del_cond_handler.construct_sub_conditions(condition));
-        }
-    }
-
-    if (!condition_strs.empty()) {
-        res = _tablet_meta->add_pending_version(segment_group->partition_id(), 
transaction_id, &condition_strs);
-    } else {
-        res = _tablet_meta->add_pending_version(segment_group->partition_id(), 
transaction_id, nullptr);
-    }
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to add pending delta to tablet_meta."
-                     << "tablet=" << full_name() << ", "
-                     << "transaction_id=" << transaction_id;
-        release_header_lock();
-        return res;
-    }
-
-    // add to tablet_meta
-    const std::vector<KeyRange>* column_statistics = nullptr;
-    if (segment_group->has_column_statistics()) {
-        column_statistics = &(segment_group->get_column_statistics());
-    }
-    res = _tablet_meta->add_pending_segment_group(transaction_id, 
segment_group->num_segments(),
-                                      segment_group->segment_group_id(), 
segment_group->load_id(),
-                                      segment_group->empty(), 
column_statistics);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to add pending segment_group to tablet_meta. 
[tablet=" << full_name()
-                     << " transaction_id=" << transaction_id << "]";
-        release_header_lock();
-        return res;
-    }
-
-    // save tablet_meta
-    res = save_tablet_meta();
-    if (res != OLAP_SUCCESS) {
-        _tablet_meta->delete_pending_delta(transaction_id);
-        LOG(FATAL) << "fail to save tablet_meta when add pending 
segment_group. [tablet=" << full_name()
-                   << " transaction_id=" << transaction_id << "]";
-        release_header_lock();
-        return res;
-    }
-
-    // add to data sources
-    _pending_data_sources[transaction_id].push_back(segment_group);
-    release_header_lock();
-    VLOG(3) << "add pending data to tablet successfully."
-            << "tablet=" << full_name() << ", transaction_id=" << 
transaction_id;
-    return res;
-
+    rs_readers->clear();
+    return Status::OK();
 }
 
-bool Tablet::has_pending_data(int64_t transaction_id) {
-    ReadLock rdlock(&_header_lock);
-    return _pending_data_sources.find(transaction_id) != 
_pending_data_sources.end();
+OLAPStatus Tablet::add_inc_rowset(const Rowset& rowset) {
+    return _table_meta->add_inc_rowset(rowset->get_rs_meta());
 }
 
-void Tablet::delete_pending_data(int64_t transaction_id) {
-    obtain_header_wrlock();
-
-    auto it = _pending_data_sources.find(transaction_id);
-    if (it == _pending_data_sources.end()) {
-        release_header_lock();
-        return;
-    }
-
-    // delete from data sources
-    for (SegmentGroup* segment_group : it->second) {
-        segment_group->release();
-        StorageEngine::get_instance()->add_unused_index(segment_group);
-    }
-    _pending_data_sources.erase(it);
-
-    // delete from tablet_meta
-    _tablet_meta->delete_pending_delta(transaction_id);
-
-    // save tablet_meta
-    if (save_tablet_meta() != OLAP_SUCCESS) {
-        LOG(FATAL) << "failed to save tablet_meta when delete pending data. 
[tablet=" << full_name()
-                   << " transaction_id=" << transaction_id << "]";
-    }
-
-    release_header_lock();
-    LOG(INFO) << "delete pending data from tablet. [tablet=" << full_name()
-              << " transaction_id=" << transaction_id << "]";
-
-}
-
-void Tablet::get_expire_pending_data(vector<int64_t>* transaction_ids) {
+NewStatus Tablet::delete_expired_inc_rowset() {
     time_t now = time(NULL);
-    ReadLock rdlock(&_header_lock);
-
-    for (auto& it : _tablet_meta->pending_delta()) {
+    vector<Version> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    for (auto& it : _tablet_meta->all_inc_rowsets()) {
         double diff = difftime(now, it.creation_time());
-        if (diff >= config::pending_data_expire_time_sec) {
-            transaction_ids->push_back(it.transaction_id());
-            VLOG(3) << "find expire pending data. tablet=" << full_name() << 
", "
-                    << "transaction_id=" << it.transaction_id() << " 
exist_sec=" << diff;
+        if (diff >= config::inc_rowset_expire_sec) {
+            expire_versions.push_back(it->version());
         }
     }
-}
-
-void Tablet::load_pending_data() {
-    LOG(INFO) << "begin to load pending_data. tablet=" << full_name() << ", "
-              << "pending_delta size=" << _tablet_meta->pending_delta_size();
-    MutexLock load_lock(&_load_lock);
-
-    // if a olap segment_group loads failed, delete it from tablet_meta
-    std::set<int64_t> error_pending_data;
-
-    for (const PPendingDelta& pending_delta : _tablet_meta->pending_delta()) {
-        for (const PPendingSegmentGroup& pending_segment_group : 
pending_delta.pending_segment_group()) {
-            SegmentGroup* segment_group = new SegmentGroup(this, false, 
-                    pending_segment_group.pending_segment_group_id(),
-                    pending_segment_group.num_segments(), true,
-                    pending_delta.partition_id(), 
pending_delta.transaction_id());
-            DCHECK(segment_group != nullptr);
-            segment_group->set_load_id(pending_segment_group.load_id());
-            if (pending_segment_group.has_empty()) {
-                segment_group->set_empty(pending_segment_group.empty());
-            }
-            
_pending_data_sources[segment_group->transaction_id()].push_back(segment_group);
-
-            if (segment_group->validate() != OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to validate segment_group when load 
pending data."
-                             << "tablet=" << full_name() << ", "
-                             << "transaction_id=" << 
segment_group->transaction_id();
-                error_pending_data.insert(segment_group->transaction_id());
-                break;
-            }
-
-            if (pending_segment_group.column_pruning_size() != 0) {
-                if (_num_key_fields != 
pending_segment_group.column_pruning_size()) {
-                    LOG(WARNING) << "column pruning size is error when load 
pending data."
-                        << "column_pruning_size=" << 
pending_segment_group.column_pruning_size() << ", "
-                        << "num_key_fields=" << _num_key_fields;
-                    error_pending_data.insert(segment_group->transaction_id());
-                    break;
-                }
-                std::vector<std::pair<std::string, std::string>> 
column_statistics_string(_num_key_fields);
-                std::vector<bool> null_vec(_num_key_fields);
-                for (size_t j = 0; j < _num_key_fields; ++j) {
-                    ColumnPruning column_pruning = 
pending_segment_group.column_pruning(j);
-                    column_statistics_string[j].first = column_pruning.min();
-                    column_statistics_string[j].second = column_pruning.max();
-                    if (column_pruning.has_null_flag()) {
-                        null_vec[j] = column_pruning.null_flag();
-                    } else {
-                        null_vec[j] = false;
-                    }
-                }
-
-                if 
(segment_group->add_column_statistics(column_statistics_string, null_vec) != 
OLAP_SUCCESS) {
-                    LOG(WARNING) << "fail to set column statistics when load 
pending data";
-                    error_pending_data.insert(pending_delta.transaction_id());
-                    break;
-                }
-            }
-
-            if (segment_group->load() != OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to load segment_group when load pending 
data."
-                    << "tablet=" << full_name() << ", transaction_id=" << 
pending_delta.transaction_id();
-                error_pending_data.insert(pending_delta.transaction_id());
-                break;
-            }
-
-            OLAPStatus add_status = 
StorageEngine::get_instance()->add_transaction(
-                    pending_delta.partition_id(), 
pending_delta.transaction_id(),
-                    _tablet_id, _schema_hash, pending_segment_group.load_id());
-
-            if (add_status != OLAP_SUCCESS) {
-                LOG(WARNING) << "find transaction exists in engine when load 
pending data. [tablet=" << full_name()
-                    << " transaction_id=" << pending_delta.transaction_id() << 
"]";
-                error_pending_data.insert(pending_delta.transaction_id());
-                break;
-            }
-        }
-
-        if (error_pending_data.find(pending_delta.transaction_id()) != 
error_pending_data.end()) {
-            continue;
-        }
-
-        VLOG(3) << "load pending data successfully. tablet=" << full_name() << 
", "
-                << "partition_id=" << pending_delta.partition_id() << ", "
-                << "transaction_id=" << pending_delta.transaction_id();
-    }
-
-    LOG(INFO) << "finish to load pending data. tablet=" << full_name() << ", "
-              << "error_data_size=" << error_pending_data.size();
-
-    for (int64_t error_data : error_pending_data) {
-        delete_pending_data(error_data);
-    }
-}
-
-// 1. need to replace local data if same version existed
-// 2. move pending data to version data
-// 3. move pending data to incremental data, it won't be merged, so we can do 
incremental clone
-OLAPStatus Tablet::publish_version(int64_t transaction_id, Version version,
-                                      VersionHash version_hash) {
-    WriteLock wrlock(&_header_lock);
-    if (_pending_data_sources.find(transaction_id) == 
_pending_data_sources.end()) {
-        LOG(WARNING) << "pending data not exists in tablet, not finished or 
deleted."
-                     << "tablet=" << full_name() << ", "
-                     << "transaction_id=" << transaction_id;
-        return OLAP_ERR_TRANSACTION_NOT_EXIST;
-    }
-    RETURN_NOT_OK(_handle_existed_version(transaction_id, version, 
version_hash));
-    std::vector<SegmentGroup*> index_vec;
-    vector<string> linked_files;
-    OLAPStatus res = OLAP_SUCCESS;
-    for (SegmentGroup* segment_group : _pending_data_sources[transaction_id]) {
-        int32_t segment_group_id = segment_group->segment_group_id();
-        for (int32_t seg_id = 0; seg_id < segment_group->num_segments(); 
++seg_id) {
-            std::string pending_index_path = 
segment_group->construct_index_file_path(segment_group_id, seg_id);
-            std::string index_path = construct_index_file_path(version, 
version_hash, segment_group_id, seg_id);
-            res = _create_hard_link(pending_index_path, index_path, 
&linked_files);
-            if (res != OLAP_SUCCESS) { remove_files(linked_files); return res; 
}
-
-            std::string pending_data_path = 
segment_group->construct_data_file_path(segment_group_id, seg_id);
-            std::string data_path = construct_data_file_path(version, 
version_hash, segment_group_id, seg_id);
-            res = _create_hard_link(pending_data_path, data_path, 
&linked_files);
-            if (res != OLAP_SUCCESS) { remove_files(linked_files); return res; 
}
-        }
-
-        segment_group->publish_version(version, version_hash);
-        index_vec.push_back(segment_group);
-    }
-
-    res = register_data_source(index_vec);
-    if (res != OLAP_SUCCESS) { remove_files(linked_files); return res; }
-
-    const PPendingDelta* pending_delta = 
_tablet_meta->get_pending_delta(transaction_id);
-    if (pending_delta->has_delete_condition()) {
-        const DeleteConditionMessage& delete_condition = 
pending_delta->delete_condition();
-        _tablet_meta->add_delete_condition(delete_condition, version.first);
-    }
-
-    // add incremental version, if failed, ignore it
-    res = _add_incremental_data(index_vec, transaction_id, version, 
version_hash);
-    VLOG(3) << "finish to add incremental version. res=" << res << ", "
-            << "tablet=" << full_name() << ", "
-            << "transaction_id=" << transaction_id << ", "
-            << "version=" << version.first << "-" << version.second;
-
-    // save tablet_meta
-    res = save_tablet_meta();
-    if (res != OLAP_SUCCESS) {
-        LOG(FATAL) << "fail to save tablet_meta when publish version. res=" << 
res << ", "
-                   << "tablet=" << full_name() << ", "
-                   << "transaction_id=" << transaction_id;
-        std::vector<SegmentGroup*> delete_index_vec;
-        // if failed, clear new data
-        unregister_data_source(version, &delete_index_vec);
-        _delete_incremental_data(version, version_hash);
-        remove_files(linked_files);
-        return res;
-    }
-
-    _tablet_meta->delete_pending_delta(transaction_id);
-    res = save_tablet_meta();
-    if (res != OLAP_SUCCESS) {
-        remove_files(linked_files);
-        LOG(FATAL) << "fail to save tablet_meta when publish version. res=" << 
res << ", "
-                   << "tablet=" << full_name() << ", "
-                   << "transaction_id=" << transaction_id;
-        return res;
-    }
-    for (SegmentGroup* segment_group : _pending_data_sources[transaction_id]) {
-        segment_group->delete_all_files();
-        segment_group->set_pending_finished();
-    }
-    _pending_data_sources.erase(transaction_id);
-
-    return res;
-}
-
-// 1. if version is same and version_hash different, delete local data, save 
tablet_meta
-// 2. if version_hash is same or version is merged, publish success, delete 
transaction, save tablet_meta
-OLAPStatus Tablet::_handle_existed_version(int64_t transaction_id, const 
Version& version,
-                                              const VersionHash& version_hash) 
{
-    const PDelta* existed_delta = nullptr;
-    for (int i = 0; i < file_delta_size(); ++i) {
-        const PDelta* delta = _tablet_meta->get_delta(i);
-        if (version.first >= delta->start_version()
-            && version.second <= delta->end_version()) {
-            existed_delta = delta;
-        }
-
-    }
-
-    if (existed_delta == nullptr) {
-        return OLAP_SUCCESS;
-    }
-
-    OLAPStatus res = OLAP_SUCCESS;
-    // if version is same and version_hash different, delete local data
-    if (existed_delta->start_version() == version.first
-        && existed_delta->end_version() == version.second
-        && existed_delta->version_hash() != version_hash) {
-        LOG(INFO) << "version_hash is different when publish version, delete 
local data. [tablet=" << full_name()
-                  << " transaction_id=" << transaction_id << "]";
-        // remove delete condition if current type is PUSH_FOR_DELETE,
-        // this occurs when user cancel delete_data soon after submit it
-        bool push_for_delete = false;
-        res = is_push_for_delete(transaction_id, &push_for_delete);
-        if (res != OLAP_SUCCESS) {
-            return res;
-        } else if (!push_for_delete) {
-            DeleteConditionHandler del_cond_handler;
-            TabletSharedPtr tablet_ptr =
-                StorageEngine::get_instance()->get_tablet(_tablet_id, 
_schema_hash);
-            if (tablet_ptr.get() != nullptr) {
-                del_cond_handler.delete_cond(tablet_ptr, version.first, false);
-            }
-        }
-        // delete local data
-        //SegmentGroup *existed_index = NULL;
-        std::vector<SegmentGroup*> existed_index_vec;
-        _delete_incremental_data(version, version_hash);
-        res = unregister_data_source(version, &existed_index_vec);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to unregister data when publish version. 
[tablet=" << full_name()
-                         << " version=" << version.first << "-" << 
version.second << " res=" << res << "]";
-            return res;
-        }
-        // save tablet_meta
-        res = save_tablet_meta();
-        if (res != OLAP_SUCCESS) {
-            LOG(FATAL) << "fail to save tablet_meta when unregister data. 
[tablet=" << full_name()
-                       << " transaction_id=" << transaction_id << "]";
-        }
-        // use StorageEngine to delete this segment_group
-        if (!existed_index_vec.empty()) {
-            StorageEngine *unused_index = StorageEngine::get_instance();
-            for (SegmentGroup* segment_group : existed_index_vec) {
-                unused_index->add_unused_index(segment_group);
-            }
-        }
-    // if version_hash is same or version is merged, publish success
-    } else {
-        LOG(INFO) << "version_hash is same when publish version, publish 
success. [tablet=" << full_name()
-                  << " transaction_id=" << transaction_id << "]";
-        res = OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
-    }
-    return res;
-}
-
-OLAPStatus Tablet::_add_incremental_data(std::vector<SegmentGroup*>& 
index_vec, int64_t transaction_id,
-                                            const Version& version, const 
VersionHash& version_hash) {
-    if (index_vec.empty()) {
-        LOG(WARNING) << "no parameter when add incremental data. tablet=" << 
full_name();
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
+    for (auto& it : expired_versions) {
+        delete_inc_rowset(it);
 
 Review comment:
   do not use version, use rowset!!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to