SWJTU-ZhangLei commented on code in PR #38243:
URL: https://github.com/apache/doris/pull/38243#discussion_r1732712524


##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -974,353 +1361,476 @@ void commit_txn_immediately(
             LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
             return;
         }
-        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
+        if 
(!(*tablet_ids)[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
             ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
             msg = ss.str();
             LOG(WARNING) << msg;
             return;
         }
-        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+        if (!(*tablet_ids)[tablet_id].has_db_id()) {
+            *need_repair_tablet_idx = true;
+        }
+        
(*table_id_tablet_ids)[(*tablet_ids)[tablet_id].table_id()].push_back(tablet_id);
         VLOG_DEBUG << "tablet_id:" << tablet_id
-                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
+                   << " value:" << (*tablet_ids)[tablet_id].ShortDebugString();
     }
 
     tablet_idx_keys.clear();
     tablet_idx_values.clear();
+}
 
-    // {table/partition} -> version
-    std::unordered_map<std::string, uint64_t> new_versions;
-    std::vector<std::string> version_keys;
+void repair_tablet_index(
+        std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& 
msg,
+        const std::string& instance_id, int64_t db_id, int64_t txn_id,
+        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta) {
+    std::stringstream ss;
+    std::vector<std::string> tablet_idx_keys;
     for (auto& [_, i] : tmp_rowsets_meta) {
-        int64_t tablet_id = i.tablet_id();
-        int64_t table_id = tablet_ids[tablet_id].table_id();
-        int64_t partition_id = i.partition_id();
-        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
-        if (new_versions.count(ver_key) == 0) {
-            new_versions.insert({ver_key, 0});
-            version_keys.push_back(std::move(ver_key));
-        }
-    }
-    std::vector<std::optional<std::string>> version_values;
-    err = txn->batch_get(&version_values, version_keys);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::READ>(err);
-        ss << "failed to get partition versions, err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg << " txn_id=" << txn_id;
-        return;
+        tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
     }
-    size_t total_versions = version_keys.size();
-    for (size_t i = 0; i < total_versions; i++) {
-        int64_t version;
-        if (version_values[i].has_value()) {
-            VersionPB version_pb;
-            if (!version_pb.ParseFromString(version_values[i].value())) {
+
+#define MAX_TABLET_INDEX_NUM 2048
+    for (size_t i = 0; i < tablet_idx_keys.size(); i += MAX_TABLET_INDEX_NUM) {
+        size_t end = (i + MAX_TABLET_INDEX_NUM) > tablet_idx_keys.size() ? 
tablet_idx_keys.size()
+                                                                         : i + 
MAX_TABLET_INDEX_NUM;
+        const std::vector<std::string> 
sub_tablet_idx_keys(tablet_idx_keys.begin() + i,
+                                                           
tablet_idx_keys.begin() + end);
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        std::vector<std::optional<std::string>> tablet_idx_values;
+        // batch get snapshot is false
+        err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
+                             Transaction::BatchGetOptions(false));
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get tablet table index ids, err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+
+        for (size_t j = 0; j < sub_tablet_idx_keys.size(); j++) {
+            if (!tablet_idx_values[j].has_value()) [[unlikely]] {
+                // The value must existed
+                code = MetaServiceCode::KV_TXN_GET_ERR;
+                ss << "failed to get tablet table index ids, err=not found"
+                   << " key=" << hex(tablet_idx_keys[j]);
+                msg = ss.str();
+                LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+                return;
+            }
+            TabletIndexPB tablet_idx_pb;
+            if (!tablet_idx_pb.ParseFromString(tablet_idx_values[j].value())) 
[[unlikely]] {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse version pb txn_id=" << txn_id
-                   << " key=" << hex(version_keys[i]);
+                ss << "malformed tablet index value key=" << 
hex(tablet_idx_keys[j])
+                   << " txn_id=" << txn_id;
                 msg = ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
-            version = version_pb.version();
-        } else {
-            version = 1;
+
+            if (!tablet_idx_pb.has_db_id()) {
+                tablet_idx_pb.set_db_id(db_id);
+                std::string idx_val;
+                if (!tablet_idx_pb.SerializeToString(&idx_val)) {
+                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                    ss << "failed to serialize tablet index value key=" << 
hex(tablet_idx_keys[j])
+                       << " txn_id=" << txn_id;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                txn->put(sub_tablet_idx_keys[j], idx_val);
+            }
+        }
+
+        err = txn->commit();
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::COMMIT>(err);
+            ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << 
err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+    }
+}
+
+void commit_txn_eventually(
+        const CommitTxnRequest* request, CommitTxnResponse* response,
+        std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& 
txn_lazy_committer,
+        MetaServiceCode& code, std::string& msg, const std::string& 
instance_id, int64_t db_id,
+        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta) {
+    std::stringstream ss;
+    TxnErrorCode err = TxnErrorCode::TXN_OK;
+    int64_t txn_id = request->txn_id();
+    bool need_advance_last_txn = false;
+    int64_t last_pending_txn_id = 0;
+
+    do {
+        std::unique_ptr<Transaction> txn;
+        err = txn_kv->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        // tablet_id -> {table/index/partition}_id
+        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+        // table_id -> tablets_ids
+        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+        bool need_repair_tablet_idx = false;
+        get_tablet_indexes(instance_id, txn_id, tmp_rowsets_meta, txn, code, 
msg, &tablet_ids,
+                           &table_id_tablet_ids, &need_repair_tablet_idx);
+        if (code != MetaServiceCode::OK) {
+            LOG(WARNING) << "get_tablet_indexes failed, txn_id=" << txn_id << 
" code=" << code;
+            return;
+        }
+
+        if (need_repair_tablet_idx) {
+            txn.reset();
+            repair_tablet_index(txn_kv, code, msg, instance_id, db_id, txn_id, 
tmp_rowsets_meta);
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "repair_tablet_index failed, txn_id=" << 
txn_id << " code=" << code;
+                return;
+            }
+            continue;
+        }
+
+        std::unordered_map<std::string, uint64_t> new_versions;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to