Lchangliang commented on code in PR #37669:
URL: https://github.com/apache/doris/pull/37669#discussion_r1694428701


##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2553,6 +2568,100 @@ void 
MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle
     response->mutable_txn_info()->CopyFrom(txn_info);
 }
 
+void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* 
controller,
+                                                 const 
AbortTxnWithCoordinatorRequest* request,
+                                                 
AbortTxnWithCoordinatorResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(abort_txn_with_coordinator);
+    if (!request->has_id() || !request->has_ip() || 
!request->has_start_time()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(abort_txn_with_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int64_t abort_txn_cnt = 0;
+    int64_t total_iteration_cnt = 0;
+    bool need_commit = false;
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {
+            total_iteration_cnt++;
+            auto [k, v] = it->next();
+            LOG(INFO) << "check txn info txn_info_key=" << hex(k);
+            TxnInfoPB info_pb;
+            if (!info_pb.ParseFromArray(v.data(), v.size())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "malformed txn running info";
+                msg = ss.str();
+                ss << " key=" << hex(k);
+                LOG(WARNING) << ss.str();
+                return;
+            }
+            const auto& coordinate = info_pb.coordinator();
+            if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED &&
+                coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && 
coordinate.id() == request->id() &&
+                coordinate.ip() == request->ip() &&

Review Comment:
   "coordinate.id() == request->id()"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to