This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0a5a18ba39b [improve](txn insert) Txn load support cloud mode (#34721) 0a5a18ba39b is described below commit 0a5a18ba39b89a658257aa94cc8020dce1a0eb01 Author: meiyi <myime...@gmail.com> AuthorDate: Wed Jun 5 15:49:24 2024 +0800 [improve](txn insert) Txn load support cloud mode (#34721) ## Proposed changes ### Purpose The user doc: https://doris.apache.org/zh-CN/docs/dev/data-operate/import/transaction-load-manual We have supported insert into select(https://github.com/apache/doris/pull/31666), update(https://github.com/apache/doris/pull/33034) and delete(https://github.com/apache/doris/pull/33100) in transaction load. https://github.com/apache/doris/pull/32980 implements one txn write to one partition more than one rowsets. This pr implements to cloud mode of https://github.com/apache/doris/pull/32980 ### Implementation #### sub_txn_id see https://github.com/apache/doris/pull/32980 #### Meta service supports commit txn This process is generally the same as commit_txn, the difference is that he partitions version will plus 1 in multi sub txns. One example: Suppose the table, partition, tablet and version info is: ``` -------------------------------------------- | table | partition | tablet | version | -------------------------------------------- | t1 | t1_p1 | t1_p1.1 | 1 | | t1 | t1_p1 | t1_p1.2 | 1 | | t1 | t1_p2 | t1_p2.1 | 2 | | t2 | t2_p3 | t2_p3.1 | 3 | | t2 | t2_p4 | t2_p4.1 | 4 | -------------------------------------------- ``` Now we commit a txn with 3 sub txns and the tablets are: * sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1 * sub_txn2: t2_p3.1 * sub_txn3: t1_p1.1, t1_p1.2 When commit, the partitions version will be: * sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3) * sub_txn2: t2_p3(3 -> 4) * sub_txn3: t1_p1(2 -> 3) After commit, the partitions version will be: * t1: t1_p1(3), t1_p2(3) * t2: t2_p3(4), t2_p4(4) #### Meta service support generate sub_txn_id by `begin_sub_txn` --- cloud/src/common/bvars.cpp | 2 + cloud/src/common/bvars.h | 2 + cloud/src/meta-service/meta_service.h | 24 + cloud/src/meta-service/meta_service_txn.cpp | 965 ++++++++++++++++++++- cloud/src/recycler/recycler.cpp | 9 + cloud/test/meta_service_test.cpp | 320 +++++++ cloud/test/recycler_test.cpp | 145 ++++ .../apache/doris/cloud/rpc/MetaServiceClient.java | 18 + .../apache/doris/cloud/rpc/MetaServiceProxy.java | 20 + .../transaction/CloudGlobalTransactionMgr.java | 144 ++- .../apache/doris/transaction/TransactionEntry.java | 83 +- .../apache/doris/transaction/TransactionState.java | 6 +- gensrc/proto/cloud.proto | 48 + regression-test/data/insert_p0/txn_insert.out | 131 +-- .../data/insert_p0/txn_insert_inject_case.out | 9 + regression-test/suites/insert_p0/txn_insert.groovy | 468 +++++----- .../insert_p0/txn_insert_concurrent_insert.groovy | 26 +- .../suites/insert_p0/txn_insert_inject_case.groovy | 69 +- .../insert_p0/txn_insert_with_schema_change.groovy | 88 +- 19 files changed, 2174 insertions(+), 403 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 1aa436bb603..43acb47e365 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -27,6 +27,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_commit_txn("ms", "commit_txn"); BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn"); BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn"); BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_max_txn_id"); +BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn"); +BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict"); BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label"); BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index b55e1051cd9..e5b50262104 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -126,6 +126,8 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict; +extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn; +extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label; extern BvarLatencyRecorderWithTag g_bvar_ms_get_version; extern BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 4dc4113f341..6ba3d5b45eb 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -61,6 +61,10 @@ public: void commit_txn(::google::protobuf::RpcController* controller, const CommitTxnRequest* request, CommitTxnResponse* response, ::google::protobuf::Closure* done) override; + void commit_txn_with_sub_txn(::google::protobuf::RpcController* controller, + const CommitTxnRequest* request, CommitTxnResponse* response, + ::google::protobuf::Closure* done); + void abort_txn(::google::protobuf::RpcController* controller, const AbortTxnRequest* request, AbortTxnResponse* response, ::google::protobuf::Closure* done) override; @@ -76,6 +80,14 @@ public: GetCurrentMaxTxnResponse* response, ::google::protobuf::Closure* done) override; + void begin_sub_txn(::google::protobuf::RpcController* controller, + const BeginSubTxnRequest* request, BeginSubTxnResponse* response, + ::google::protobuf::Closure* done) override; + + void abort_sub_txn(::google::protobuf::RpcController* controller, + const AbortSubTxnRequest* request, AbortSubTxnResponse* response, + ::google::protobuf::Closure* done) override; + void check_txn_conflict(::google::protobuf::RpcController* controller, const CheckTxnConflictRequest* request, CheckTxnConflictResponse* response, @@ -321,6 +333,18 @@ public: call_impl(&cloud::MetaService::get_current_max_txn_id, controller, request, response, done); } + void begin_sub_txn(::google::protobuf::RpcController* controller, + const BeginSubTxnRequest* request, BeginSubTxnResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::begin_sub_txn, controller, request, response, done); + } + + void abort_sub_txn(::google::protobuf::RpcController* controller, + const AbortSubTxnRequest* request, AbortSubTxnResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::abort_sub_txn, controller, request, response, done); + } + void check_txn_conflict(::google::protobuf::RpcController* controller, const CheckTxnConflictRequest* request, CheckTxnConflictResponse* response, diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 7866fccaa39..5596cbbf7eb 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -662,6 +662,10 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, const CommitTxnRequest* request, CommitTxnResponse* response, ::google::protobuf::Closure* done) { + if (request->has_is_txn_load() && request->is_txn_load()) { + commit_txn_with_sub_txn(controller, request, response, done); + return; + } RPC_PREPROCESS(commit_txn); if (!request->has_txn_id()) { code = MetaServiceCode::INVALID_ARGUMENT; @@ -1024,15 +1028,585 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; - for (auto tablet_id : table_id_tablet_ids[table_id]) { - std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); - txn->remove(pending_key); - LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key) - << " txn_id=" << txn_id; - } + for (auto tablet_id : table_id_tablet_ids[table_id]) { + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); + txn->remove(pending_key); + LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key) + << " txn_id=" << txn_id; + } + } + lock_keys.clear(); + lock_values.clear(); + + // Save rowset meta + for (auto& i : rowsets) { + size_t rowset_size = i.first.size() + i.second.size(); + txn->put(i.first, i.second); + LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << txn_id + << " rowset_size=" << rowset_size; + } + + // Save versions + for (auto& i : new_versions) { + std::string ver_val; + VersionPB version_pb; + version_pb.set_version(i.second); + if (!version_pb.SerializeToString(&ver_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize version_pb when saving, txn_id=" << txn_id; + msg = ss.str(); + return; + } + + txn->put(i.first, ver_val); + LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second + << " txn_id=" << txn_id; + + std::string_view ver_key = i.first; + ver_key.remove_prefix(1); // Remove key space + // PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} + std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; + int ret = decode_key(&ver_key, &out); + if (ret != 0) [[unlikely]] { + // decode version key error means this is something wrong, + // we can not continue this txn + LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(ver_key); + code = MetaServiceCode::UNDEFINED_ERR; + msg = "decode version key error"; + return; + } + + int64_t table_id = std::get<int64_t>(std::get<0>(out[4])); + int64_t partition_id = std::get<int64_t>(std::get<0>(out[5])); + VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id; + + response->add_table_ids(table_id); + response->add_partition_ids(partition_id); + response->add_versions(i.second); + } + + // Save table versions + for (auto& i : table_id_tablet_ids) { + std::string ver_key = table_version_key({instance_id, db_id, i.first}); + txn->atomic_add(ver_key, 1); + LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " txn_id=" << txn_id; + } + + LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString(); + + // Update txn_info + txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE); + + auto now_time = system_clock::now(); + uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); + if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) { + code = MetaServiceCode::UNDEFINED_ERR; + msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id); + LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time() + << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time; + return; + } + txn_info.set_commit_time(commit_time); + txn_info.set_finish_time(commit_time); + if (request->has_commit_attachment()) { + txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); + } + LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString(); + info_val.clear(); + if (!txn_info.SerializeToString(&info_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; + msg = ss.str(); + return; + } + txn->put(info_key, info_val); + LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id; + + // Update stats of affected tablet + std::deque<std::string> kv_pool; + std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> update_tablet_stats; + if (config::split_tablet_stats) { + update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) { + if (stats.num_segs > 0) { + auto& data_size_key = kv_pool.emplace_back(); + stats_tablet_data_size_key(info, &data_size_key); + txn->atomic_add(data_size_key, stats.data_size); + auto& num_rows_key = kv_pool.emplace_back(); + stats_tablet_num_rows_key(info, &num_rows_key); + txn->atomic_add(num_rows_key, stats.num_rows); + auto& num_segs_key = kv_pool.emplace_back(); + stats_tablet_num_segs_key(info, &num_segs_key); + txn->atomic_add(num_segs_key, stats.num_segs); + } + auto& num_rowsets_key = kv_pool.emplace_back(); + stats_tablet_num_rowsets_key(info, &num_rowsets_key); + txn->atomic_add(num_rowsets_key, stats.num_rowsets); + }; + } else { + update_tablet_stats = [&](const StatsTabletKeyInfo& info, const TabletStats& stats) { + auto& key = kv_pool.emplace_back(); + stats_tablet_key(info, &key); + auto& val = kv_pool.emplace_back(); + TxnErrorCode err = txn->get(key, &val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, + std::get<4>(info)); + return; + } + TabletStatsPB stats_pb; + if (!stats_pb.ParseFromString(val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("malformed tablet stats value, key={}", hex(key)); + return; + } + stats_pb.set_data_size(stats_pb.data_size() + stats.data_size); + stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows); + stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets); + stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); + stats_pb.SerializeToString(&val); + txn->put(key, val); + }; + } + for (auto& [tablet_id, stats] : tablet_stats) { + DCHECK(tablet_ids.count(tablet_id)); + auto& tablet_idx = tablet_ids[tablet_id]; + StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_id}; + update_tablet_stats(info, stats); + if (code != MetaServiceCode::OK) return; + } + // Remove tmp rowset meta + for (auto& [k, _] : tmp_rowsets_meta) { + txn->remove(k); + LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id; + } + + const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); + LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id; + txn->remove(running_key); + + std::string recycle_val; + std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); + RecycleTxnPB recycle_pb; + recycle_pb.set_creation_time(commit_time); + recycle_pb.set_label(txn_info.label()); + + if (!recycle_pb.SerializeToString(&recycle_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize recycle_pb, txn_id=" << txn_id; + msg = ss.str(); + return; + } + txn->put(recycle_key, recycle_val); + + if (txn_info.load_job_source_type() == + LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) { + put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); + } + + LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; + LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() + << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys() + << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; + + // Finally we are done... + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + return; + } + + // calculate table stats from tablets stats + std::map<int64_t /*table_id*/, TableStats> table_stats; + std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(), + request->base_tablet_ids().end()); + calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids); + for (const auto& pair : table_stats) { + TableStatsPB* stats_pb = response->add_table_stats(); + auto table_id = pair.first; + auto stats = pair.second; + get_pb_from_tablestats(stats, stats_pb); + stats_pb->set_table_id(table_id); + VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id + << " table_id=" << table_id + << " updated_row_count=" << stats_pb->updated_row_count(); + } + + response->mutable_txn_info()->CopyFrom(txn_info); +} // end commit_txn + +/** + * This process is generally the same as commit_txn, the difference is that + * the partitions version will plus 1 in multi sub txns. + * + * One example: + * Suppose the table, partition, tablet and version info is: + * -------------------------------------------- + * | table | partition | tablet | version | + * -------------------------------------------- + * | t1 | t1_p1 | t1_p1.1 | 1 | + * | t1 | t1_p1 | t1_p1.2 | 1 | + * | t1 | t1_p2 | t1_p2.1 | 2 | + * | t2 | t2_p3 | t2_p3.1 | 3 | + * | t2 | t2_p4 | t2_p4.1 | 4 | + * -------------------------------------------- + * + * Now we commit a txn with 3 sub txns and the tablets are: + * sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1 + * sub_txn2: t2_p3.1 + * sub_txn3: t1_p1.1, t1_p1.2 + * When commit, the partitions version will be: + * sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3) + * sub_txn2: t2_p3(3 -> 4) + * sub_txn3: t1_p1(2 -> 3) + * After commit, the partitions version will be: + * t1: t1_p1(3), t1_p2(3) + * t2: t2_p3(4), t2_p4(4) + */ +void MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController* controller, + const CommitTxnRequest* request, + CommitTxnResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(commit_txn); + if (!request->has_txn_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "invalid argument, missing txn id"; + return; + } + + int64_t txn_id = request->txn_id(); + auto sub_txn_infos = request->sub_txn_infos(); + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " txn_id=" << txn_id; + return; + } + + RPC_RATE_LIMIT(commit_txn) + + // Create a readonly txn for scan tmp rowset + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + // Get db id with txn id + std::string index_val; + const std::string index_key = txn_index_key({instance_id, txn_id}); + err = txn->get(index_key, &index_val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + TxnIndexPB index_pb; + if (!index_pb.ParseFromString(index_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_index_pb, txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + DCHECK(index_pb.has_tablet_index() == true); + DCHECK(index_pb.tablet_index().has_db_id() == true); + int64_t db_id = index_pb.tablet_index().db_id(); + + // Get temporary rowsets involved in the txn + std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>> + sub_txn_to_tmp_rowsets_meta; + for (const auto& sub_txn_info : sub_txn_infos) { + auto sub_txn_id = sub_txn_info.sub_txn_id(); + // This is a range scan + MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0}; + MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0}; + std::string rs_tmp_key0; + std::string rs_tmp_key1; + meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0); + meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1); + // Get rowset meta that should be commited + // tmp_rowset_key -> rowset_meta + std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta; + + int num_rowsets = 0; + std::unique_ptr<int, std::function<void(int*)>> defer_log_range( + (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, &sub_txn_id](int*) { + LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id + << ", sub_txn_id=" << sub_txn_id << " num_rowsets=" << num_rowsets + << " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1) << ")"; + }); + + std::unique_ptr<RangeGetIterator> it; + do { + err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); + if (err == TxnErrorCode::TXN_TOO_OLD) { + err = txn_kv_->create_txn(&txn); + if (err == TxnErrorCode::TXN_OK) { + err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); + } + } + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id + << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next()) { + auto [k, v] = it->next(); + LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id; + tmp_rowsets_meta.emplace_back(); + if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), v.size())) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id + << " key=" << hex(k); + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + // Save keys that will be removed later + tmp_rowsets_meta.back().first = std::string(k.data(), k.size()); + ++num_rowsets; + if (!it->has_next()) rs_tmp_key0 = k; + } + rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more()); + + VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id + << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size(); + sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, std::move(tmp_rowsets_meta)); + } + + // Create a read/write txn for guarantee consistency + txn.reset(); + err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + // Get txn info with db_id and txn_id + std::string info_val; // Will be reused when saving updated txn + const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + err = txn->get(info_key, &info_val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + TxnInfoPB txn_info; + if (!txn_info.ParseFromString(info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + // TODO: do more check like txn state + DCHECK(txn_info.txn_id() == txn_id); + if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { + code = MetaServiceCode::TXN_ALREADY_ABORTED; + ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { + code = MetaServiceCode::TXN_ALREADY_VISIBLE; + ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + response->mutable_txn_info()->CopyFrom(txn_info); + return; + } + + LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString(); + + // Prepare rowset meta and new_versions + // Read tablet indexes in batch. + std::map<int64_t, int64_t> tablet_id_to_idx; + std::vector<std::string> tablet_idx_keys; + std::vector<int64_t> partition_ids; + auto idx = 0; + for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) { + for (auto& [_, i] : tmp_rowsets_meta) { + auto tablet_id = i.tablet_id(); + if (tablet_id_to_idx.count(tablet_id) == 0) { + tablet_id_to_idx.emplace(tablet_id, idx); + tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); + partition_ids.push_back(i.partition_id()); + idx++; + } + } + } + std::vector<std::optional<std::string>> tablet_idx_values; + err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, Transaction::BatchGetOptions(false)); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get tablet table index ids, err=" << err; + msg = ss.str(); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + + // tablet_id -> {table/index/partition}_id + std::unordered_map<int64_t, TabletIndexPB> tablet_ids; + // table_id -> tablets_ids + std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids; + for (auto [tablet_id, i] : tablet_id_to_idx) { + if (!tablet_idx_values[i].has_value()) [[unlikely]] { + // The value must existed + code = MetaServiceCode::KV_TXN_GET_ERR; + ss << "failed to get tablet table index ids, err=not found" + << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]); + msg = ss.str(); + LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; + return; + } + if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "malformed tablet index value tablet_id=" << tablet_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id); + VLOG_DEBUG << "tablet_id:" << tablet_id + << " value:" << tablet_ids[tablet_id].ShortDebugString(); + } + + tablet_idx_keys.clear(); + tablet_idx_values.clear(); + + // {table/partition} -> version + std::unordered_map<std::string, uint64_t> new_versions; + std::vector<std::string> version_keys; + for (auto& [tablet_id, i] : tablet_id_to_idx) { + int64_t table_id = tablet_ids[tablet_id].table_id(); + int64_t partition_id = partition_ids[i]; + std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); + if (new_versions.count(ver_key) == 0) { + new_versions.insert({ver_key, 0}); + LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << " txn_id=" << txn_id + << ", db_id=" << db_id << ", table_id=" << table_id + << ", partition_id=" << partition_id; + version_keys.push_back(std::move(ver_key)); + } + } + std::vector<std::optional<std::string>> version_values; + err = txn->batch_get(&version_values, version_keys); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get partition versions, err=" << err; + msg = ss.str(); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + size_t total_versions = version_keys.size(); + for (size_t i = 0; i < total_versions; i++) { + int64_t version; + if (version_values[i].has_value()) { + VersionPB version_pb; + if (!version_pb.ParseFromString(version_values[i].value())) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse version pb txn_id=" << txn_id + << " key=" << hex(version_keys[i]); + msg = ss.str(); + return; + } + version = version_pb.version(); + } else { + version = 1; + } + new_versions[version_keys[i]] = version; + LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i]) + << " version:" << version << " txn_id=" << txn_id; + } + version_keys.clear(); + version_values.clear(); + + std::vector<std::pair<std::string, std::string>> rowsets; + std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats + for (const auto& sub_txn_info : sub_txn_infos) { + auto sub_txn_id = sub_txn_info.sub_txn_id(); + auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id]; + std::unordered_map<int64_t, int64_t> partition_id_to_version; + for (auto& [_, i] : tmp_rowsets_meta) { + int64_t tablet_id = i.tablet_id(); + int64_t table_id = tablet_ids[tablet_id].table_id(); + int64_t partition_id = i.partition_id(); + std::string ver_key = + partition_version_key({instance_id, db_id, table_id, partition_id}); + if (new_versions.count(ver_key) == 0) [[unlikely]] { + // it is impossible. + code = MetaServiceCode::UNDEFINED_ERR; + ss << "failed to get partition version key, the target version not exists in " + "new_versions." + << " txn_id=" << txn_id << ", db_id=" << db_id << ", table_id=" << table_id + << ", partition_id=" << partition_id; + msg = ss.str(); + LOG(ERROR) << msg; + return; + } + + // Update rowset version + int64_t new_version = new_versions[ver_key]; + if (partition_id_to_version.count(partition_id) == 0) { + new_versions[ver_key] = new_version + 1; + new_version = new_versions[ver_key]; + partition_id_to_version[partition_id] = new_version; + } + i.set_start_version(new_version); + i.set_end_version(new_version); + LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id + << ", sub_txn_id=" << sub_txn_id << ", table_id=" << table_id + << ", partition_id=" << partition_id << ", tablet_id=" << tablet_id + << ", new_version=" << new_version; + + std::string key = meta_rowset_key({instance_id, tablet_id, i.end_version()}); + std::string val; + if (!i.SerializeToString(&val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize rowset_meta, txn_id=" << txn_id; + msg = ss.str(); + return; + } + rowsets.emplace_back(std::move(key), std::move(val)); + + // Accumulate affected rows + auto& stats = tablet_stats[tablet_id]; + stats.data_size += i.data_disk_size(); + stats.num_rows += i.num_rows(); + ++stats.num_rowsets; + stats.num_segs += i.num_segments(); + } // for tmp_rowsets_meta } - lock_keys.clear(); - lock_values.clear(); // Save rowset meta for (auto& i : rowsets) { @@ -1074,7 +1648,8 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, int64_t table_id = std::get<int64_t>(std::get<0>(out[4])); int64_t partition_id = std::get<int64_t>(std::get<0>(out[5])); - VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id; + VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id + << " partition_id=" << partition_id << " version=" << i.second; response->add_table_ids(table_id); response->add_partition_ids(partition_id); @@ -1174,9 +1749,11 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, if (code != MetaServiceCode::OK) return; } // Remove tmp rowset meta - for (auto& [k, _] : tmp_rowsets_meta) { - txn->remove(k); - LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id; + for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) { + for (auto& [k, _] : tmp_rowsets_meta) { + txn->remove(k); + LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id; + } } const std::string running_key = txn_running_key({instance_id, db_id, txn_id}); @@ -1197,11 +1774,6 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, } txn->put(recycle_key, recycle_val); - if (txn_info.load_job_source_type() == - LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) { - put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); - } - LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys() @@ -1233,7 +1805,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, } response->mutable_txn_info()->CopyFrom(txn_info); -} // end commit_txn +} // end commit_txn_with_sub_txn void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, const AbortTxnRequest* request, AbortTxnResponse* response, @@ -1612,6 +2184,327 @@ void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController* response->set_current_max_txn_id(current_max_txn_id); } +/** + * 1. Generate a sub_txn_id + * + * The following steps are done in a txn: + * 2. Put txn_index_key in sub_txn_id + * 3. Delete txn_label_key in sub_txn_id + * 4. Modify the txn state of the txn_id: + * - Add the sub txn id to sub_txn_ids: recycler use it to recycle the txn_index_key + * - Add the table id to table_ids + */ +void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controller, + const BeginSubTxnRequest* request, + BeginSubTxnResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(begin_sub_txn); + int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; + int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1; + int64_t db_id = request->has_db_id() ? request->db_id() : -1; + auto& table_ids = request->table_ids(); + std::string label = request->has_label() ? request->label() : ""; + if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() || label.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" << sub_txn_num + << " db_id=" << db_id << ", label=" << label << ", table_ids=["; + for (auto table_id : table_ids) { + ss << table_id << ", "; + } + ss << "]"; + msg = ss.str(); + return; + } + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); + msg = ss.str(); + return; + } + + RPC_RATE_LIMIT(begin_sub_txn) + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id + << " db_id=" << db_id; + msg = ss.str(); + return; + } + + const std::string label_key = txn_label_key({instance_id, db_id, label}); + std::string label_val; + err = txn->get(label_key, &label_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as<ErrCategory::READ>(err); + ss << "txn->get failed(), err=" << err << " label=" << label; + msg = ss.str(); + return; + } + + LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; + + // err == OK means this is a retry rpc? + if (err == TxnErrorCode::TXN_OK) { + label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN); + } + + // ret > 0, means label not exist previously. + txn->atomic_set_ver_value(label_key, label_val); + LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key); + + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "txn->commit failed(), label=" << label << " err=" << err; + msg = ss.str(); + return; + } + + // 2. Get sub txn id from version stamp + txn.reset(); + err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "failed to create txn when get txn id, label=" << label << " err=" << err; + msg = ss.str(); + return; + } + + label_val.clear(); + err = txn->get(label_key, &label_val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "txn->get() failed, label=" << label << " err=" << err; + msg = ss.str(); + return; + } + + LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err; + + // Generated by TxnKv system + int64_t sub_txn_id = 0; + int ret = + get_txn_id_from_fdb_ts(std::string_view(label_val).substr( + label_val.size() - VERSION_STAMP_LEN, label_val.size()), + &sub_txn_id); + if (ret != 0) { + code = MetaServiceCode::TXN_GEN_ID_ERR; + ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret; + msg = ss.str(); + return; + } + + LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" << sub_txn_id + << " txn_id=" << txn_id << " label_val.size()=" << label_val.size(); + + // write txn_index_key + const std::string index_key = txn_index_key({instance_id, sub_txn_id}); + std::string index_val; + TxnIndexPB index_pb; + if (!index_pb.SerializeToString(&index_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize txn_index_pb " + << "label=" << label << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + + // Get and update txn info with db_id and txn_id + std::string info_val; // Will be reused when saving updated txn + const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + err = txn->get(info_key, &info_val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + TxnInfoPB txn_info; + if (!txn_info.ParseFromString(info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + DCHECK(txn_info.txn_id() == txn_id); + if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) { + code = MetaServiceCode::TXN_INVALID_STATUS; + ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id + << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + if (txn_info.sub_txn_ids().size() != sub_txn_num) { + code = MetaServiceCode::UNDEFINED_ERR; + ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected sub_txn_num=" << sub_txn_num + << ", txn_info.sub_txn_ids=["; + for (auto id : txn_info.sub_txn_ids()) { + ss << id << ", "; + } + ss << "]"; + msg = ss.str(); + LOG(WARNING) << msg; + } + txn_info.mutable_sub_txn_ids()->Add(sub_txn_id); + txn_info.mutable_table_ids()->Clear(); + for (auto table_id : table_ids) { + txn_info.mutable_table_ids()->Add(table_id); + } + if (!txn_info.SerializeToString(&info_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize txn_info when saving, txn_id=" << txn_id; + msg = ss.str(); + return; + } + + txn->remove(label_key); + txn->put(info_key, info_val); + txn->put(index_key, index_val); + LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id + << ", remove label_key=" << hex(label_key) << ", put info_key=" << hex(info_key) + << ", put index_key=" << hex(index_key); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err; + msg = ss.str(); + return; + } + response->set_sub_txn_id(sub_txn_id); + response->mutable_txn_info()->CopyFrom(txn_info); +} + +/** + * 1. Modify the txn state of the txn_id: + * - Remove the table id from table_ids + */ +void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controller, + const AbortSubTxnRequest* request, + AbortSubTxnResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(abort_sub_txn); + int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; + int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : -1; + int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1; + int64_t db_id = request->has_db_id() ? request->db_id() : -1; + auto& table_ids = request->table_ids(); + if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id + << ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ", table_ids=["; + for (auto table_id : table_ids) { + ss << table_id << ", "; + } + ss << "]"; + msg = ss.str(); + return; + } + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); + msg = ss.str(); + return; + } + + RPC_RATE_LIMIT(abort_sub_txn) + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id + << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id; + msg = ss.str(); + return; + } + + // Get and update txn info with db_id and txn_id + std::string info_val; // Will be reused when saving updated txn + const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + err = txn->get(info_key, &info_val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id + << " sub_txn_id=" << sub_txn_id << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + TxnInfoPB txn_info; + if (!txn_info.ParseFromString(info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id + << " sub_txn_id=" << sub_txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + DCHECK(txn_info.txn_id() == txn_id); + if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) { + code = MetaServiceCode::TXN_INVALID_STATUS; + ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id + << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + // remove table_id and does not need to remove sub_txn_id + if (txn_info.sub_txn_ids().size() != sub_txn_num) { + code = MetaServiceCode::UNDEFINED_ERR; + ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id + << ", expected sub_txn_num=" << sub_txn_num << ", txn_info.sub_txn_ids=["; + for (auto id : txn_info.sub_txn_ids()) { + ss << id << ", "; + } + ss << "]"; + msg = ss.str(); + LOG(WARNING) << msg; + } + txn_info.mutable_table_ids()->Clear(); + for (auto table_id : table_ids) { + txn_info.mutable_table_ids()->Add(table_id); + } + // TODO should we try to delete txn_label_key if begin_sub_txn failed to delete? + + if (!txn_info.SerializeToString(&info_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize txn_info when saving, txn_id=" << txn_id + << " sub_txn_id=" << sub_txn_id; + msg = ss.str(); + return; + } + + txn->put(info_key, info_val); + LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id + << ", put info_key=" << hex(info_key); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id + << ", err=" << err; + msg = ss.str(); + return; + } + response->mutable_txn_info()->CopyFrom(txn_info); +} + void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller, const CheckTxnConflictRequest* request, CheckTxnConflictResponse* response, @@ -1686,24 +2579,24 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont if (running_pb.timeout_time() < check_time) { skip_timeout_txn_cnt++; - break; - } - - LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) - << " running_pb=" << running_pb.ShortDebugString(); - std::vector<int64_t> running_table_ids(running_pb.table_ids().begin(), - running_pb.table_ids().end()); - std::sort(running_table_ids.begin(), running_table_ids.end()); - std::vector<int64_t> result(std::min(running_table_ids.size(), src_table_ids.size())); - std::vector<int64_t>::iterator iter = std::set_intersection( - src_table_ids.begin(), src_table_ids.end(), running_table_ids.begin(), - running_table_ids.end(), result.begin()); - result.resize(iter - result.begin()); - if (result.size() > 0) { - response->set_finished(false); - LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt - << " total iteration count: " << total_iteration_cnt; - return; + } else { + LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) + << " running_pb=" << running_pb.ShortDebugString(); + std::vector<int64_t> running_table_ids(running_pb.table_ids().begin(), + running_pb.table_ids().end()); + std::sort(running_table_ids.begin(), running_table_ids.end()); + std::vector<int64_t> result( + std::min(running_table_ids.size(), src_table_ids.size())); + std::vector<int64_t>::iterator iter = std::set_intersection( + src_table_ids.begin(), src_table_ids.end(), running_table_ids.begin(), + running_table_ids.end(), result.begin()); + result.resize(iter - result.begin()); + if (result.size() > 0) { + response->set_finished(false); + LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt + << " total iteration count: " << total_iteration_cnt; + return; + } } if (!it->has_next()) { diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 05744bc596d..ca4c17b61ff 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1906,6 +1906,15 @@ int InstanceRecycler::recycle_expired_txn_label() { return -1; } txn->remove(info_key); + // Remove sub txn index kvs + std::vector<std::string> sub_txn_index_keys; + for (auto sub_txn_id : txn_info.sub_txn_ids()) { + auto sub_txn_index_key = txn_index_key({instance_id_, sub_txn_id}); + sub_txn_index_keys.push_back(sub_txn_index_key); + } + for (auto& sub_txn_index_key : sub_txn_index_keys) { + txn->remove(sub_txn_index_key); + } // Update txn label std::string label_key, label_val; txn_label_key({instance_id_, db_id, txn_info.label()}, &label_key); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index b2b203f66e0..013433239d3 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -1332,6 +1332,326 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) { } } +TEST(MetaServiceTest, CommitTxnWithSubTxnTest) { + auto meta_service = get_meta_service(); + int64_t db_id = 98131; + int64_t txn_id = -1; + int64_t t1 = 10; + int64_t t1_index = 100; + int64_t t1_p1 = 11; + int64_t t1_p1_t1 = 12; + int64_t t1_p1_t2 = 13; + int64_t t1_p2 = 14; + int64_t t1_p2_t1 = 15; + int64_t t2 = 16; + int64_t t2_index = 101; + int64_t t2_p3 = 17; + int64_t t2_p3_t1 = 18; + [[maybe_unused]] int64_t t2_p4 = 19; + [[maybe_unused]] int64_t t2_p4_t1 = 20; + // begin txn + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label"); + txn_info_pb.add_table_ids(t1); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + } + + // mock rowset and tablet: for sub_txn1 + int64_t sub_txn_id1 = txn_id; + { + create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1); + auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t1, t1_p1); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2); + auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t2, t1_p1); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + create_tablet(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1); + auto tmp_rowset = create_rowset(sub_txn_id1, t1_p2_t1, t1_p2); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // begin_sub_txn2 + int64_t sub_txn_id2 = -1; + { + brpc::Controller cntl; + BeginSubTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + req.set_sub_txn_num(0); + req.set_db_id(db_id); + req.set_label("test_label_0"); + req.mutable_table_ids()->Add(t1); + req.mutable_table_ids()->Add(t2); + BeginSubTxnResponse res; + meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), 2); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1); + ASSERT_TRUE(res.has_sub_txn_id()); + sub_txn_id2 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); + } + // mock rowset and tablet: for sub_txn3 + { + create_tablet(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1); + auto tmp_rowset = create_rowset(sub_txn_id2, t2_p3_t1, t2_p3); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // begin_sub_txn3 + int64_t sub_txn_id3 = -1; + { + brpc::Controller cntl; + BeginSubTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + req.set_sub_txn_num(1); + req.set_db_id(db_id); + req.set_label("test_label_1"); + req.mutable_table_ids()->Add(t1); + req.mutable_table_ids()->Add(t2); + req.mutable_table_ids()->Add(t1); + BeginSubTxnResponse res; + meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), 3); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); + ASSERT_TRUE(res.has_sub_txn_id()); + sub_txn_id3 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); + } + // mock rowset and tablet: for sub_txn3 + { + create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1); + auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t1, t1_p1); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2); + auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t2, t1_p1); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // commit txn + CommitTxnRequest req; + { + brpc::Controller cntl; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(666); + req.set_txn_id(txn_id); + req.set_is_txn_load(true); + + SubTxnInfo sub_txn_info1; + sub_txn_info1.set_sub_txn_id(sub_txn_id1); + sub_txn_info1.set_table_id(t1); + sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1); + sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2); + sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1); + + SubTxnInfo sub_txn_info2; + sub_txn_info2.set_sub_txn_id(sub_txn_id2); + sub_txn_info2.set_table_id(t2); + sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1); + + SubTxnInfo sub_txn_info3; + sub_txn_info3.set_sub_txn_id(sub_txn_id3); + sub_txn_info3.set_table_id(t1); + sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1); + sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2); + + req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1)); + req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2)); + req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3)); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + // std::cout << res.DebugString() << std::endl; + ASSERT_EQ(res.table_ids().size(), 3); + + ASSERT_EQ(res.table_ids()[0], t2); + ASSERT_EQ(res.partition_ids()[0], t2_p3); + ASSERT_EQ(res.versions()[0], 2); + + ASSERT_EQ(res.table_ids()[1], t1); + ASSERT_EQ(res.partition_ids()[1], t1_p2); + ASSERT_EQ(res.versions()[1], 2); + + ASSERT_EQ(res.table_ids()[2], t1); + ASSERT_EQ(res.partition_ids()[2], t1_p1); + ASSERT_EQ(res.versions()[2], 3); + } + + // doubly commit txn + { + brpc::Controller cntl; + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_VISIBLE); + auto found = res.status().msg().find( + fmt::format("transaction is already visible: db_id={} txn_id={}", db_id, txn_id)); + ASSERT_TRUE(found != std::string::npos); + } +} + +TEST(MetaServiceTest, BeginAndAbortSubTxnTest) { + auto meta_service = get_meta_service(); + long db_id = 98762; + int64_t txn_id = -1; + // begin txn + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label"); + txn_info_pb.add_table_ids(1234); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + } + // case: begin 2 sub txn + int64_t sub_txn_id1 = -1; + int64_t sub_txn_id2 = -1; + for (int i = 0; i < 2; i++) { + { + brpc::Controller cntl; + BeginSubTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + req.set_sub_txn_num(i); + req.set_db_id(db_id); + req.set_label("test_label_" + std::to_string(i)); + req.mutable_table_ids()->Add(1234); + req.mutable_table_ids()->Add(1235); + if (i == 1) { + req.mutable_table_ids()->Add(1235); + } + BeginSubTxnResponse res; + meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), i == 0 ? 1 : 2); + ASSERT_TRUE(res.has_sub_txn_id()); + if (i == 0) { + sub_txn_id1 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]); + } else { + sub_txn_id2 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]); + } + } + // get txn state + { + brpc::Controller cntl; + GetTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + GetTxnResponse res; + meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3); + ASSERT_EQ(res.txn_info().table_ids()[0], 1234); + ASSERT_EQ(res.txn_info().table_ids()[1], 1235); + if (i == 1) { + ASSERT_EQ(res.txn_info().table_ids()[2], 1235); + } + } + } + // case: abort sub txn + { + { + brpc::Controller cntl; + AbortSubTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + req.set_sub_txn_id(sub_txn_id2); + req.set_sub_txn_num(2); + req.set_db_id(db_id); + req.mutable_table_ids()->Add(1234); + req.mutable_table_ids()->Add(1235); + AbortSubTxnResponse res; + meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + // check txn state + ASSERT_EQ(res.txn_info().table_ids().size(), 2); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); + ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]); + } + // get txn state + { + brpc::Controller cntl; + GetTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + GetTxnResponse res; + meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), 2); + ASSERT_EQ(res.txn_info().table_ids()[0], 1234); + ASSERT_EQ(res.txn_info().table_ids()[1], 1235); + } + } + // check label key does not exist + for (int i = 0; i < 2; i++) { + std::string key = + txn_label_key({"test_instance", db_id, "test_label_" + std::to_string(i)}); + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + } + // check txn index key exist + for (auto i : {sub_txn_id1, sub_txn_id2}) { + std::string key = txn_index_key({"test_instance", i}); + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + } +} + TEST(MetaServiceTest, AbortTxnTest) { auto meta_service = get_meta_service(); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index f653f4e8e0b..07aa5b2d40b 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -1464,6 +1464,151 @@ TEST(RecyclerTest, recycle_expired_txn_label) { ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); } + + label = "recycle_expired_txn_label_with_sub_txn"; + int64_t table2_id = 12131278; + { + // 1. begin_txn + // 2. begin_sub_txn2 + // 3. begin_sub_txn3 + // 4. abort_sub_txn3 + // 5. commit_txn + // 6. recycle_expired_txn_label + // 7. check + [[maybe_unused]] int64_t sub_txn_id1 = -1; + int64_t sub_txn_id2 = -1; + int64_t sub_txn_id3 = -1; + { + brpc::Controller cntl; + BeginTxnRequest req; + + req.set_cloud_unique_id(cloud_unique_id); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label(label); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(10000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + txn_id = res.txn_id(); + sub_txn_id1 = txn_id; + ASSERT_GT(txn_id, -1); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + InstanceInfoPB instance; + instance.set_instance_id(mock_instance); + InstanceRecycler recycler(txn_kv, instance); + ASSERT_EQ(recycler.init(), 0); + sleep(1); + recycler.abort_timeout_txn(); + TxnInfoPB txn_info_pb; + get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); + ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED); + + // 2. begin sub_txn2 + { + brpc::Controller cntl; + BeginSubTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + req.set_txn_id(txn_id); + req.set_sub_txn_num(0); + req.set_db_id(db_id); + req.set_label("test_sub_label1"); + req.mutable_table_ids()->Add(table_id); + req.mutable_table_ids()->Add(table2_id); + BeginSubTxnResponse res; + meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), 2); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1); + ASSERT_TRUE(res.has_sub_txn_id()); + sub_txn_id2 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); + } + + // 3. begin sub_txn3 + { + brpc::Controller cntl; + BeginSubTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + req.set_txn_id(txn_id); + req.set_sub_txn_num(1); + req.set_db_id(db_id); + req.set_label("test_sub_label2"); + req.mutable_table_ids()->Add(table_id); + req.mutable_table_ids()->Add(table2_id); + req.mutable_table_ids()->Add(table_id); + BeginSubTxnResponse res; + meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().table_ids().size(), 3); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); + ASSERT_TRUE(res.has_sub_txn_id()); + sub_txn_id3 = res.sub_txn_id(); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); + ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); + } + + // 4. abort sub_txn3 + { + brpc::Controller cntl; + AbortSubTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_txn_id(txn_id); + req.set_sub_txn_num(2); + req.set_sub_txn_id(sub_txn_id3); + req.set_db_id(db_id); + req.mutable_table_ids()->Add(table_id); + req.mutable_table_ids()->Add(table2_id); + AbortSubTxnResponse res; + meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + // check txn state + ASSERT_EQ(res.txn_info().table_ids().size(), 2); + ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); + ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); + ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); + } + + // 4. commit_txn + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.set_is_txn_load(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + // check txn_index_key for sub_txn_id exist + for (auto i : {sub_txn_id2, sub_txn_id3}) { + std::string key = txn_index_key({mock_instance, i}); + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + } + // 5. recycle + recycler.recycle_expired_txn_label(); + ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); + ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); + // check txn_index_key for sub_txn_id are deleted + for (auto i : {sub_txn_id2, sub_txn_id3}) { + std::string key = txn_index_key({mock_instance, i}); + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + } + } } void create_object_file_pb(std::string prefix, std::vector<ObjectFilePB>* object_files, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index 7463a684680..f7a178deb01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -200,6 +200,24 @@ public class MetaServiceClient { return blockingStub.getCurrentMaxTxnId(request); } + public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.BeginSubTxnRequest.Builder builder = Cloud.BeginSubTxnRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.beginSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.beginSubTxn(request); + } + + public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.AbortSubTxnRequest.Builder builder = Cloud.AbortSubTxnRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.abortSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.abortSubTxn(request); + } + public Cloud.CheckTxnConflictResponse checkTxnConflict(Cloud.CheckTxnConflictRequest request) { return blockingStub.checkTxnConflict(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 117cfd71bd0..a2dbdaac2c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -246,6 +246,26 @@ public class MetaServiceProxy { } } + public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest request) + throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.beginSubTxn(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + + public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest request) + throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.abortSubTxn(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.CheckTxnConflictResponse checkTxnConflict(Cloud.CheckTxnConflictRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 98c3c5dfc8d..3184d150df7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -28,8 +28,12 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudPartition; +import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest; +import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse; import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; +import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest; +import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse; import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest; import org.apache.doris.cloud.proto.Cloud.BeginTxnResponse; import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictRequest; @@ -50,6 +54,7 @@ import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; +import org.apache.doris.cloud.proto.Cloud.SubTxnInfo; import org.apache.doris.cloud.proto.Cloud.TableStatsPB; import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; @@ -63,6 +68,7 @@ import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; @@ -124,10 +130,12 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -139,6 +147,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15; private TxnStateCallbackFactory callbackFactory; + private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>(); public CloudGlobalTransactionMgr() { this.callbackFactory = new TxnStateCallbackFactory(); @@ -453,6 +462,11 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } final CommitTxnRequest commitTxnRequest = builder.build(); + commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); + } + + private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, + List<Table> tableList) throws UserException { CommitTxnResponse commitTxnResponse = null; int retryTime = 0; @@ -783,7 +797,35 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, List<SubTransactionState> subTransactionStates, long timeoutMillis) throws UserException { - throw new UnsupportedOperationException("commitAndPublishTransaction is not supported in cloud"); + if (Config.disable_load_job) { + throw new TransactionCommitFailedException( + "disable_load_job is set to true, all load jobs are not allowed"); + } + LOG.info("try to commit transaction, txnId: {}, subTxnStates: {}", transactionId, subTransactionStates); + cleanSubTransactions(transactionId); + CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); + builder.setDbId(db.getId()) + .setTxnId(transactionId) + .setIs2Pc(false) + .setCloudUniqueId(Config.cloud_unique_id) + .setIsTxnLoad(true); + // add sub txn infos + for (SubTransactionState subTransactionState : subTransactionStates) { + builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId()) + .setTableId(subTransactionState.getTable().getId()) + .addAllBaseTabletIds( + getBaseTabletsFromTables(Lists.newArrayList(subTransactionState.getTable()), + subTransactionState.getTabletCommitInfos().stream() + .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())) + .collect(Collectors.toList()))) + .build()); + } + + final CommitTxnRequest commitTxnRequest = builder.build(); + commitTxn(commitTxnRequest, transactionId, false, db.getId(), + subTransactionStates.stream().map(SubTransactionState::getTable) + .collect(Collectors.toList())); + return true; } @Override @@ -812,6 +854,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public void abortTransaction(Long dbId, Long transactionId, String reason) throws UserException { + cleanSubTransactions(transactionId); abortTransaction(dbId, transactionId, reason, null, null); } @@ -1133,6 +1176,11 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public TransactionState getTransactionState(long dbId, long transactionId) { + if (subTxnIdToTxnId.containsKey(transactionId)) { + LOG.info("try to get transaction state, subTxnId:{}, transactionId:{}", transactionId, + subTxnIdToTxnId.get(transactionId)); + transactionId = subTxnIdToTxnId.get(transactionId); + } LOG.info("try to get transaction state, dbId:{}, transactionId:{}", dbId, transactionId); GetTxnRequest.Builder builder = GetTxnRequest.newBuilder(); builder.setDbId(dbId); @@ -1351,11 +1399,101 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public void addSubTransaction(long dbId, long transactionId, long subTransactionId) { - throw new UnsupportedOperationException("addSubTransaction is not supported in cloud"); + subTxnIdToTxnId.put(subTransactionId, transactionId); } @Override public void removeSubTransaction(long dbId, long subTransactionId) { - throw new UnsupportedOperationException("removeSubTransaction is not supported in cloud"); + subTxnIdToTxnId.remove(subTransactionId); + } + + private void cleanSubTransactions(long transactionId) { + Iterator<Entry<Long, Long>> iterator = subTxnIdToTxnId.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<Long, Long> entry = iterator.next(); + if (entry.getValue() == transactionId) { + iterator.remove(); + } + } + } + + public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId, List<Long> tableIds, String label, + long subTxnNum) throws UserException { + LOG.info("try to begin sub transaction, txnId: {}, dbId: {}, tableIds: {}, label: {}, subTxnNum: {}", txnId, + dbId, tableIds, label, subTxnNum); + BeginSubTxnRequest request = BeginSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id) + .setTxnId(txnId).setDbId(dbId).addAllTableIds(tableIds).setLabel(label).setSubTxnNum(subTxnNum).build(); + BeginSubTxnResponse response = null; + int retryTime = 0; + try { + while (retryTime < Config.metaServiceRpcRetryTimes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, beginSubTxnRequest:{}", retryTime, request); + } + response = MetaServiceProxy.getInstance().beginSubTxn(request); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, beginSubTxnResponse:{}", retryTime, response); + } + + if (response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + LOG.info("beginSubTxn KV_TXN_CONFLICT, retryTime:{}", retryTime); + backoff(); + retryTime++; + } + + Preconditions.checkNotNull(response); + Preconditions.checkNotNull(response.getStatus()); + } catch (Exception e) { + LOG.warn("beginSubTxn failed, exception:", e); + throw new UserException("beginSubTxn failed, errMsg:" + e.getMessage()); + } + + if (response.getStatus().getCode() != MetaServiceCode.OK) { + throw new UserException(response.getStatus().getMsg()); + } + return Pair.of(response.hasSubTxnId() ? response.getSubTxnId() : 0, + TxnUtil.transactionStateFromPb(response.getTxnInfo())); + } + + public TransactionState abortSubTxn(long txnId, long subTxnId, long dbId, List<Long> tableIds, long subTxnNum) + throws UserException { + LOG.info("try to abort sub transaction, txnId: {}, subTxnId: {}, dbId: {}, tableIds: {}, subTxnNum: {}", txnId, + subTxnId, dbId, tableIds, subTxnNum); + AbortSubTxnRequest request = AbortSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id) + .setTxnId(txnId).setSubTxnId(subTxnId).setDbId(dbId).addAllTableIds(tableIds).setSubTxnNum(subTxnId) + .build(); + AbortSubTxnResponse response = null; + int retryTime = 0; + try { + while (retryTime < Config.metaServiceRpcRetryTimes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, abortSubTxnRequest:{}", retryTime, request); + } + response = MetaServiceProxy.getInstance().abortSubTxn(request); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, abortSubTxnResponse:{}", retryTime, response); + } + + if (response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + LOG.info("abortSubTxn KV_TXN_CONFLICT, retryTime:{}", retryTime); + backoff(); + retryTime++; + } + + Preconditions.checkNotNull(response); + Preconditions.checkNotNull(response.getStatus()); + } catch (Exception e) { + LOG.warn("abortSubTxn failed, exception:", e); + throw new UserException("abortSubTxn failed, errMsg:" + e.getMessage()); + } + + if (response.getStatus().getCode() != MetaServiceCode.OK) { + throw new UserException(response.getStatus().getMsg()); + } + return TxnUtil.transactionStateFromPb(response.getTxnInfo()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 00a95a42c22..f80782bbf5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -21,10 +21,15 @@ import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; @@ -40,6 +45,7 @@ import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TTxnLoadInfo; import org.apache.doris.thrift.TTxnParams; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TWaitingTxnStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.SubTransactionState.SubTransactionType; @@ -77,6 +83,12 @@ public class TransactionEntry { private long transactionId = -1; private TransactionState transactionState; private long timeoutTimestamp = -1; + // 1. For cloud mode, we keep subTransactionStates in TransactionEntry; + // 2. For doris, we keep subTransactionStates in TransactionState, because if executed in observer, + // the dml statements will be forwarded to master, so keep the subTransactionStates is in master. + private List<SubTransactionState> subTransactionStates = new ArrayList<>(); + // Used for cloud mode, including all successful or failed sub transactions except the first one + private long allSubTxnNum = 0; public TransactionEntry() { } @@ -176,11 +188,18 @@ public class TransactionEntry { throw new AnalysisException( "Transaction insert can not insert into values and insert into select at the same time"); } + if (Config.isCloudMode()) { + OlapTable olapTable = (OlapTable) table; + if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()) { + throw new UserException( + "Transaction load is not supported for merge on write unique keys table in cloud mode"); + } + } DatabaseIf database = table.getDatabase(); if (!isTransactionBegan) { long timeoutSecond = ConnectContext.get().getExecTimeout(); this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond * 1000; - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), Lists.newArrayList(table.getId()), label, new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), @@ -206,9 +225,23 @@ public class TransactionEntry { throw new AnalysisException( "Transaction insert must be in the same database, expect db_id=" + this.database.getId()); } - this.transactionState.addTableId(table.getId()); - long subTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId(); + long subTxnId; + if (Config.isCloudMode()) { + TUniqueId queryId = ConnectContext.get().queryId(); + String label = String.format("tl_%x_%x", queryId.hi, queryId.lo); + List<Long> tableIds = getTableIds(); + tableIds.add(table.getId()); + Pair<Long, TransactionState> pair + = ((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr()).beginSubTxn( + transactionId, table.getDatabase().getId(), tableIds, label, allSubTxnNum); + this.transactionState = pair.second; + subTxnId = pair.first; + } else { + subTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId(); + this.transactionState.addTableId(table.getId()); + } Env.getCurrentGlobalTransactionMgr().addSubTransaction(database.getId(), transactionId, subTxnId); + allSubTxnNum++; return subTxnId; } } @@ -216,7 +249,7 @@ public class TransactionEntry { public TransactionStatus commitTransaction() throws Exception { if (isTransactionBegan) { try { - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { beforeFinishTransaction(); long commitTimeout = Math.min(60000L, Math.max(timeoutTimestamp - System.currentTimeMillis(), 0)); if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, transactionId, @@ -275,7 +308,7 @@ public class TransactionEntry { private long abortTransaction(String reason) throws Exception { if (isTransactionBegan) { - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { beforeFinishTransaction(); Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), transactionId, reason); return transactionId; @@ -301,7 +334,9 @@ public class TransactionEntry { if (isTransactionBegan) { List<Long> tableIds = transactionState.getTableIdList().stream().distinct().collect(Collectors.toList()); transactionState.setTableIdList(tableIds); - transactionState.getSubTransactionStates().sort((s1, s2) -> { + List<SubTransactionState> subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates + : transactionState.getSubTransactionStates(); + subTransactionStatesPtr.sort((s1, s2) -> { if (s1.getSubTransactionType() == SubTransactionType.INSERT && s2.getSubTransactionType() == SubTransactionType.DELETE) { return 1; @@ -312,6 +347,9 @@ public class TransactionEntry { return Long.compare(s1.getSubTransactionId(), s2.getSubTransactionId()); } }); + if (Config.isCloudMode()) { + transactionState.setSubTransactionStates(subTransactionStatesPtr); + } LOG.info("subTransactionStates={}", transactionState.getSubTransactionStates()); transactionState.resetSubTxnIds(); } @@ -329,7 +367,18 @@ public class TransactionEntry { public void abortSubTransaction(long subTransactionId, Table table) { if (isTransactionBegan) { - this.transactionState.removeTableId(table.getId()); + if (Config.isCloudMode()) { + try { + List<Long> tableIds = getTableIds(); + this.transactionState + = ((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr()).abortSubTxn( + transactionId, subTransactionId, table.getDatabase().getId(), tableIds, allSubTxnNum); + } catch (UserException e) { + LOG.error("Failed to remove table_id={} from txn_id={}", table.getId(), transactionId, e); + } + } else { + this.transactionState.removeTableId(table.getId()); + } Env.getCurrentGlobalTransactionMgr().removeSubTransaction(table.getDatabase().getId(), subTransactionId); } } @@ -340,13 +389,15 @@ public class TransactionEntry { LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={}, commit_infos={}", label, transactionId, subTxnId, table, commitInfos); } - transactionState.getSubTransactionStates() + List<SubTransactionState> subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates + : transactionState.getSubTransactionStates(); + subTransactionStatesPtr .add(new SubTransactionState(subTxnId, table, commitInfos, subTransactionType)); - Preconditions.checkState( - transactionState.getTableIdList().size() == transactionState.getSubTransactionStates().size(), - "txn_id=" + transactionId + ", expect table_list=" + transactionState.getSubTransactionStates().stream() - .map(s -> s.getTable().getId()).collect(Collectors.toList()) + ", real table_list=" - + transactionState.getTableIdList()); + Preconditions.checkState(transactionState.getTableIdList().size() == subTransactionStatesPtr.size(), + "txn_id=" + transactionId + + ", expect table_list=" + + subTransactionStatesPtr.stream().map(s -> s.getTable().getId()).collect(Collectors.toList()) + + ", real table_list=" + transactionState.getTableIdList()); } public boolean isTransactionBegan() { @@ -442,4 +493,10 @@ public class TransactionEntry { LOG.info("set txn load info in observer, label={}, txnId={}, dbId={}, timeoutTimestamp={}", label, transactionId, dbId, timeoutTimestamp); } + + private List<Long> getTableIds() { + List<SubTransactionState> subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates + : transactionState.getSubTransactionStates(); + return subTransactionStatesPtr.stream().map(s -> s.getTable().getId()).collect(Collectors.toList()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 3d1c2d54faa..f628c945b6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -689,7 +689,7 @@ public class TransactionState implements Writable { sb.append(", db id: ").append(dbId); sb.append(", table id list: ").append(StringUtils.join(tableIdList, ",")); sb.append(", callback id: ").append(callbackId); - sb.append(", coordinator: ").append(txnCoordinator.toString()); + sb.append(", coordinator: ").append(txnCoordinator); sb.append(", transaction status: ").append(transactionStatus); sb.append(", error replicas num: ").append(errorReplicas.size()); sb.append(", replica ids: ").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray())); @@ -859,6 +859,10 @@ public class TransactionState implements Writable { this.subTransactionStates = new ArrayList<>(); } + public void setSubTransactionStates(List<SubTransactionState> subTransactionStates) { + this.subTransactionStates = subTransactionStates; + } + public void resetSubTxnIds() { this.subTxnIds = subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList()); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index bc57925e572..76ac8a3d306 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -378,6 +378,8 @@ message TxnInfoPB { optional TxnStatusPB status = 15; optional TxnCommitAttachmentPB commit_attachment = 16; optional int64 listener_id = 17; //callback id + // for transaction load, used for recycler + repeated int64 sub_txn_ids = 18; // TODO: There are more fields TBD } @@ -646,6 +648,15 @@ message CommitTxnRequest { // merge-on-write table ids repeated int64 mow_table_ids = 6; repeated int64 base_tablet_ids= 7; // all tablet from base tables (excluding mv) + // for transaction load + optional bool is_txn_load = 9; + repeated SubTxnInfo sub_txn_infos = 10; +} + +message SubTxnInfo { + optional int64 sub_txn_id = 1; + optional int64 table_id = 2; + repeated int64 base_tablet_ids= 3; } // corresponding to TabletStats in meta_service.h and FrontendServiceImpl.java @@ -702,6 +713,41 @@ message GetTxnIdResponse { optional int64 txn_id = 2; } +message BeginSubTxnRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 txn_id = 2; + // all successful or failed sub txn except the first one + optional int64 sub_txn_num = 3; + optional int64 db_id = 4; + // set table_ids in txn_info + repeated int64 table_ids = 5; + // a random label used to generate a sub_txn_id + optional string label = 6; +} + +message BeginSubTxnResponse { + optional MetaServiceResponseStatus status = 1; + optional int64 sub_txn_id = 2; + optional TxnInfoPB txn_info = 3; +} + +message AbortSubTxnRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 txn_id = 2; + // used for log + optional int64 sub_txn_id = 3; + // all successful or failed sub txn except the first one + optional int64 sub_txn_num = 4; + optional int64 db_id = 5; + // set table_ids in txn_info + repeated int64 table_ids = 6; +} + +message AbortSubTxnResponse { + optional MetaServiceResponseStatus status = 1; + optional TxnInfoPB txn_info = 2; +} + message GetCurrentMaxTxnRequest { optional string cloud_unique_id = 1; // For auth } @@ -1365,6 +1411,8 @@ service MetaService { rpc check_txn_conflict(CheckTxnConflictRequest) returns (CheckTxnConflictResponse); rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse); rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse); + rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse); + rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse); rpc get_version(GetVersionRequest) returns (GetVersionResponse); rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse); diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/txn_insert.out index 4cd97b8cb0c..257bdcc0311 100644 --- a/regression-test/data/insert_p0/txn_insert.out +++ b/regression-test/data/insert_p0/txn_insert.out @@ -39,52 +39,6 @@ 6 8 --- !select7 -- - --- !select8 -- - --- !select9 -- - --- !select1 -- -\N \N \N [null] [null, 0] -1 2.2 abc [] [] -2 3.3 xyz [1] [1, 0] - --- !select2 -- -\N \N \N [null] [null, 0] -1 2.2 abc [] [] -2 3.3 xyz [1] [1, 0] - --- !select3 -- -\N \N \N [null] [null, 0] -1 2.2 abc [] [] -1 2.2 abc [] [] -1 2.2 abc [] [] -2 3.3 xyz [1] [1, 0] -2 3.3 xyz [1] [1, 0] -2 3.3 xyz [1] [1, 0] - --- !select4 -- -\N \N \N [null] [null, 0] -1 2.2 abc [] [] -1 2.2 abc [] [] -1 2.2 abc [] [] -2 3.3 xyz [1] [1, 0] -2 3.3 xyz [1] [1, 0] -2 3.3 xyz [1] [1, 0] - --- !select5 -- -1 2 -3 4 -5 6 -7 8 - --- !select6 -- -2 -4 -6 -8 - -- !select7 -- \N \N \N [null] [null, 0] 1 2.2 abc [] [] @@ -521,23 +475,6 @@ 2 3.3 xyz [1] [1, 0] 2 3.3 xyz [1] [1, 0] --- !select28 -- -1 a 10 -2 b 20 -3 c 30 - --- !select29 -- -1 a 10 -2 b 20 -3 c 30 -4 d 40 - --- !select30 -- -1 a 11 -2 b 20 -3 c 30 -4 d 40 - -- !select31 -- 1 a 10 10 a 10 @@ -958,31 +895,6 @@ 9 a 10 9 a 10 --- !select38 -- -1 a 101 - --- !select39 -- -1 a 100 - --- !select40 -- -1 2000-01-01 1 1 1.0 -3 2000-01-03 3 3 3.0 - --- !select41 -- -2 2000-01-20 20 20 20.0 -3 2000-01-30 30 30 30.0 -4 2000-01-04 4 4 4.0 -6 2000-01-10 10 10 10.0 - --- !select42 -- -3 2000-01-03 3 3 3.0 - --- !select43 -- -1 2000-01-01 1 1 1.0 -2 2000-01-02 2 2 2.0 -3 2000-01-03 3 3 3.0 -6 2000-01-10 10 10 10.0 - -- !select44 -- \N \N \N [null] [null, 0] \N \N \N [null] [null, 0] @@ -1244,6 +1156,49 @@ 2 3.3 xyz [1] [1, 0] 2 3.3 xyz [1] [1, 0] +-- !selectmowi0 -- +1 a 10 +2 b 20 +3 c 30 + +-- !selectmowi1 -- +1 a 10 +2 b 20 +3 c 30 +4 d 40 + +-- !selectmowi2 -- +1 a 11 +2 b 21 +3 c 30 +4 d 40 +5 e 50 + +-- !selectmowu1 -- +1 a 101 + +-- !selectmowu2 -- +1 a 100 + +-- !selectmowd1 -- +1 2000-01-01 1 1 1.0 +3 2000-01-03 3 3 3.0 + +-- !selectmowd2 -- +2 2000-01-20 20 20 20.0 +3 2000-01-30 30 30 30.0 +4 2000-01-04 4 4 4.0 +6 2000-01-10 10 10 10.0 + +-- !selectmowd3 -- +3 2000-01-03 3 3 3.0 + +-- !selectmowd4 -- +1 2000-01-01 1 1 1.0 +2 2000-01-02 2 2 2.0 +3 2000-01-03 3 3 3.0 +6 2000-01-10 10 10 10.0 + -- !select_cu0 -- 1 0 10 2 0 20 diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out b/regression-test/data/insert_p0/txn_insert_inject_case.out new file mode 100644 index 00000000000..799229be54a --- /dev/null +++ b/regression-test/data/insert_p0/txn_insert_inject_case.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +\N \N \N [null] [null, 0] +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy index d9ccd6831cf..1d4183c8662 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -42,7 +42,7 @@ suite("txn_insert") { return null } - for (def use_nereids_planner : [false, true]) { + for (def use_nereids_planner : [/*false,*/ true]) { sql " SET enable_nereids_planner = $use_nereids_planner; " sql """ DROP TABLE IF EXISTS $table """ @@ -242,7 +242,26 @@ suite("txn_insert") { order_qt_select24 """select * from ${table}_2""" } - // 7. insert into select to same table + // 7. insert into tables in different database + if (use_nereids_planner) { + def db2 = "regression_test_insert_p0_1" + sql """ create database if not exists $db2 """ + + try { + sql """ create table ${db2}.${table} like ${table} """ + sql """ begin; """ + sql """ insert into ${table} select * from ${table}_0; """ + test { + sql """ insert into $db2.${table} select * from ${table}_0; """ + exception """Transaction insert must be in the same database, expect db_id""" + } + } finally { + sql """rollback""" + sql """ drop database if exists $db2 """ + } + } + + // 8. insert into select to same table if (use_nereids_planner) { sql """ begin; """ sql """ insert into ${table}_0 select * from ${table}_1; """ @@ -273,61 +292,9 @@ suite("txn_insert") { } } - // 8. insert into tables in different database - if (use_nereids_planner) { - def db2 = "regression_test_insert_p0_1" - sql """ create database if not exists $db2 """ - - try { - sql """ create table ${db2}.${table} like ${table} """ - sql """ begin; """ - sql """ insert into ${table} select * from ${table}_0; """ - test { - sql """ insert into $db2.${table} select * from ${table}_0; """ - exception """Transaction insert must be in the same database, expect db_id""" - } - } finally { - sql """rollback""" - sql """ drop database if exists $db2 """ - } - } - - // 9. insert into mow tables - if (use_nereids_planner) { - def unique_table = "ut" - for (def i in 0..2) { - sql """ drop table if exists ${unique_table}_${i} """ - sql """ - CREATE TABLE ${unique_table}_${i} ( - `id` int(11) NOT NULL, - `name` varchar(50) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - UNIQUE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - """ + (i == 2 ? "\"function_column.sequence_col\"='score', " : "") + - """ - "replication_num" = "1" - ); - """ - } - sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b", 20), (3, "c", 30); """ - sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b", 19), (4, "d", 40); """ - sql """ begin """ - sql """ insert into ${unique_table}_2 select * from ${unique_table}_0; """ - sql """ insert into ${unique_table}_1 select * from ${unique_table}_0; """ - sql """ insert into ${unique_table}_2 select * from ${unique_table}_1; """ - sql """ commit; """ - sql "sync" - order_qt_select28 """select * from ${unique_table}_0""" - order_qt_select29 """select * from ${unique_table}_1""" - order_qt_select30 """select * from ${unique_table}_2""" - } - - // 10. insert into table with multi partitions and tablets + // 9. insert into table with multi partitions and tablets if (use_nereids_planner) { - def pt = "multi_partition_t" + def pt = "txn_insert_multi_partition_t" for (def i in 0..3) { sql """ drop table if exists ${pt}_${i} """ sql """ @@ -380,142 +347,7 @@ suite("txn_insert") { sql """ set enable_insert_strict = true """ } - // 11. update stmt - if (use_nereids_planner) { - def ut_table = "txn_insert_ut" - for (def i in 1..2) { - def tableName = ut_table + "_" + i - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE ${tableName} ( - `ID` int(11) NOT NULL, - `NAME` varchar(100) NULL, - `score` int(11) NULL - ) ENGINE=OLAP - unique KEY(`id`) - COMMENT 'OLAP' - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ); - """ - } - sql """ insert into ${ut_table}_1 values(1, "a", 100); """ - sql """ begin; """ - sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """ - sql """ update ${ut_table}_1 set score = 101 where id = 1; """ - sql """ commit; """ - sql "sync" - order_qt_select38 """select * from ${ut_table}_1 """ - order_qt_select39 """select * from ${ut_table}_2 """ - } - - // 12. delete from using and delete from stmt - if (use_nereids_planner) { - for (def ta in ["txn_insert_dt1", "txn_insert_dt2", "txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) { - sql """ drop table if exists ${ta} """ - } - - for (def ta in ["txn_insert_dt1", "txn_insert_dt4", "txn_insert_dt5"]) { - sql """ - create table ${ta} ( - id int, - dt date, - c1 bigint, - c2 string, - c3 double - ) unique key (id, dt) - partition by range(dt) ( - from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY - ) - distributed by hash(id) - properties( - 'replication_num'='1', - "enable_unique_key_merge_on_write" = "true" - ); - """ - sql """ - INSERT INTO ${ta} VALUES - (1, '2000-01-01', 1, '1', 1.0), - (2, '2000-01-02', 2, '2', 2.0), - (3, '2000-01-03', 3, '3', 3.0); - """ - } - - sql """ - create table txn_insert_dt2 ( - id int, - dt date, - c1 bigint, - c2 string, - c3 double - ) unique key (id) - distributed by hash(id) - properties( - 'replication_num'='1' - ); - """ - sql """ - create table txn_insert_dt3 ( - id int - ) distributed by hash(id) - properties( - 'replication_num'='1' - ); - """ - sql """ - INSERT INTO txn_insert_dt2 VALUES - (1, '2000-01-10', 10, '10', 10.0), - (2, '2000-01-20', 20, '20', 20.0), - (3, '2000-01-30', 30, '30', 30.0), - (4, '2000-01-04', 4, '4', 4.0), - (5, '2000-01-05', 5, '5', 5.0); - """ - sql """ - INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5); - """ - sql """ begin """ - test { - sql ''' - delete from txn_insert_dt1 temporary partition (p_20000102) - using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id - where txn_insert_dt1.id = txn_insert_dt2.id; - ''' - exception 'Partition: p_20000102 is not exists' - } - sql """ - delete from txn_insert_dt1 partition (p_20000102) - using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id - where txn_insert_dt1.id = txn_insert_dt2.id; - """ - sql """ - delete from txn_insert_dt4 - using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id - where txn_insert_dt4.id = txn_insert_dt2.id; - """ - sql """ - delete from txn_insert_dt2 where id = 1; - """ - sql """ - delete from txn_insert_dt2 where id = 5; - """ - sql """ - delete from txn_insert_dt5 partition(p_20000102) where id = 1; - """ - sql """ - delete from txn_insert_dt5 partition(p_20000102) where id = 5; - """ - sql """ commit """ - sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, '10', 10.0) """ - sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, '10', 10.0) """ - sql "sync" - order_qt_select40 """select * from txn_insert_dt1 """ - order_qt_select41 """select * from txn_insert_dt2 """ - order_qt_select42 """select * from txn_insert_dt4 """ - order_qt_select43 """select * from txn_insert_dt5 """ - } - - // 13. decrease be 'pending_data_expire_time_sec' config + // 10. decrease be 'pending_data_expire_time_sec' config if (use_nereids_planner) { def backendId_to_params = get_be_param("pending_data_expire_time_sec") try { @@ -534,7 +366,7 @@ suite("txn_insert") { } } - // 14. delete and insert + // 11. delete and insert if (use_nereids_planner) { sql """ begin; """ sql """ delete from ${table}_0 where k1 = 1 or k1 = 2; """ @@ -544,7 +376,7 @@ suite("txn_insert") { order_qt_select45 """select * from ${table}_0""" } - // 15. insert and delete + // 12. insert and delete if (use_nereids_planner) { order_qt_select46 """select * from ${table}_1""" sql """ begin; """ @@ -559,7 +391,7 @@ suite("txn_insert") { order_qt_select48 """select * from ${table}_1""" } - // 16. txn insert does not commit or rollback by user, and txn is aborted because connection is closed + // 13. txn insert does not commit or rollback by user, and txn is aborted because connection is closed def dbName = "regression_test_insert_p0" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") @@ -584,22 +416,24 @@ suite("txn_insert") { thread.start() thread.join() assertNotEquals(txn_id, 0) - def txn_state = "" - for (int i = 0; i < 20; i++) { - def txn_info = sql_return_maparray """ show transaction where id = ${txn_id} """ - logger.info("txn_info: ${txn_info}") - assertEquals(1, txn_info.size()) - txn_state = txn_info[0].get("TransactionStatus") - if ("ABORTED" == txn_state) { - break - } else { - sleep(2000) + if (!isCloudMode()) { + def txn_state = "" + for (int i = 0; i < 20; i++) { + def txn_info = sql_return_maparray """ show transaction where id = ${txn_id} """ + logger.info("txn_info: ${txn_info}") + assertEquals(1, txn_info.size()) + txn_state = txn_info[0].get("TransactionStatus") + if ("ABORTED" == txn_state) { + break + } else { + sleep(2000) + } } + assertEquals("ABORTED", txn_state) } - assertEquals("ABORTED", txn_state) } - // 17. txn insert does not commit or rollback by user, and txn is aborted because timeout + // 14. txn insert does not commit or rollback by user, and txn is aborted because timeout // TODO find a way to check be txn_manager is also cleaned if (use_nereids_planner) { // 1. use show transaction command to check @@ -620,19 +454,21 @@ suite("txn_insert") { thread.start() insertLatch.await(1, TimeUnit.MINUTES) assertNotEquals(txn_id, 0) - def txn_state = "" - for (int i = 0; i < 20; i++) { - def txn_info = sql_return_maparray """ show transaction where id = ${txn_id} """ - logger.info("txn_info: ${txn_info}") - assertEquals(1, txn_info.size()) - txn_state = txn_info[0].get("TransactionStatus") - if ("ABORTED" == txn_state) { - break - } else { - sleep(2000) + if (!isCloudMode()) { + def txn_state = "" + for (int i = 0; i < 20; i++) { + def txn_info = sql_return_maparray """ show transaction where id = ${txn_id} """ + logger.info("txn_info: ${txn_info}") + assertEquals(1, txn_info.size()) + txn_state = txn_info[0].get("TransactionStatus") + if ("ABORTED" == txn_state) { + break + } else { + sleep(2000) + } } + assertEquals("ABORTED", txn_state) } - assertEquals("ABORTED", txn_state) // after the txn is timeout: do insert/ commit/ rollback def insert_timeout = sql """show variables where variable_name = 'insert_timeout';""" @@ -686,7 +522,191 @@ suite("txn_insert") { } } - // 18. column update + // 15. insert into mow tables + if (use_nereids_planner) { + def unique_table = "txn_insert_ut" + for (def i in 0..2) { + sql """ drop table if exists ${unique_table}_${i} """ + sql """ + CREATE TABLE ${unique_table}_${i} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + """ + (i == 2 ? "\"function_column.sequence_col\"='score', " : "") + + """ + "replication_num" = "1" + ); + """ + } + sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b", 20), (3, "c", 30); """ + sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b", 19), (4, "d", 40); """ + sql """ insert into ${unique_table}_2 values(1, "a", 9), (2, "b", 21), (4, "d", 39), (5, "e", 50); """ + sql """ begin """ + try { + sql """ insert into ${unique_table}_2 select * from ${unique_table}_0; """ + sql """ insert into ${unique_table}_1 select * from ${unique_table}_0; """ + sql """ insert into ${unique_table}_2 select * from ${unique_table}_1; """ + sql """ commit; """ + sql "sync" + order_qt_selectmowi0 """select * from ${unique_table}_0""" + order_qt_selectmowi1 """select * from ${unique_table}_1""" + order_qt_selectmowi2 """select * from ${unique_table}_2""" + } catch (Exception e) { + logger.info("exception: " + e) + if (isCloudMode()) { + assertTrue(e.getMessage().contains("Transaction load is not supported for merge on write unique keys table in cloud mode")) + sql """ rollback """ + } else { + assertTrue(false, "should not reach here") + } + } + } + + // the following cases are not supported in cloud mode + if (isCloudMode()) { + break + } + + // 16. update stmt(mow table) + if (use_nereids_planner) { + def ut_table = "txn_insert_ut" + for (def i in 1..2) { + def tableName = ut_table + "_" + i + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `ID` int(11) NOT NULL, + `NAME` varchar(100) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + unique KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + } + sql """ insert into ${ut_table}_1 values(1, "a", 100); """ + sql """ begin; """ + sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """ + sql """ update ${ut_table}_1 set score = 101 where id = 1; """ + sql """ commit; """ + sql "sync" + order_qt_selectmowu1 """select * from ${ut_table}_1 """ + order_qt_selectmowu2 """select * from ${ut_table}_2 """ + } + + // 17. delete from using and delete from stmt(mow table) + if (use_nereids_planner) { + for (def ta in ["txn_insert_dt1", "txn_insert_dt2", "txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) { + sql """ drop table if exists ${ta} """ + } + + for (def ta in ["txn_insert_dt1", "txn_insert_dt4", "txn_insert_dt5"]) { + sql """ + create table ${ta} ( + id int, + dt date, + c1 bigint, + c2 string, + c3 double + ) unique key (id, dt) + partition by range(dt) ( + from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY + ) + distributed by hash(id) + properties( + 'replication_num'='1', + "enable_unique_key_merge_on_write" = "true" + ); + """ + sql """ + INSERT INTO ${ta} VALUES + (1, '2000-01-01', 1, '1', 1.0), + (2, '2000-01-02', 2, '2', 2.0), + (3, '2000-01-03', 3, '3', 3.0); + """ + } + + sql """ + create table txn_insert_dt2 ( + id int, + dt date, + c1 bigint, + c2 string, + c3 double + ) unique key (id) + distributed by hash(id) + properties( + 'replication_num'='1' + ); + """ + sql """ + create table txn_insert_dt3 ( + id int + ) distributed by hash(id) + properties( + 'replication_num'='1' + ); + """ + sql """ + INSERT INTO txn_insert_dt2 VALUES + (1, '2000-01-10', 10, '10', 10.0), + (2, '2000-01-20', 20, '20', 20.0), + (3, '2000-01-30', 30, '30', 30.0), + (4, '2000-01-04', 4, '4', 4.0), + (5, '2000-01-05', 5, '5', 5.0); + """ + sql """ + INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5); + """ + sql """ begin """ + test { + sql ''' + delete from txn_insert_dt1 temporary partition (p_20000102) + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt1.id = txn_insert_dt2.id; + ''' + exception 'Partition: p_20000102 is not exists' + } + sql """ + delete from txn_insert_dt1 partition (p_20000102) + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt1.id = txn_insert_dt2.id; + """ + sql """ + delete from txn_insert_dt4 + using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id = txn_insert_dt3.id + where txn_insert_dt4.id = txn_insert_dt2.id; + """ + sql """ + delete from txn_insert_dt2 where id = 1; + """ + sql """ + delete from txn_insert_dt2 where id = 5; + """ + sql """ + delete from txn_insert_dt5 partition(p_20000102) where id = 1; + """ + sql """ + delete from txn_insert_dt5 partition(p_20000102) where id = 5; + """ + sql """ commit """ + sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, '10', 10.0) """ + sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, '10', 10.0) """ + sql "sync" + order_qt_selectmowd1 """select * from txn_insert_dt1 """ + order_qt_selectmowd2 """select * from txn_insert_dt2 """ + order_qt_selectmowd3 """select * from txn_insert_dt4 """ + order_qt_selectmowd4 """select * from txn_insert_dt5 """ + } + + // 18. column update(mow table) if (use_nereids_planner) { def unique_table = "txn_insert_cu" for (def i in 0..3) { @@ -738,4 +758,14 @@ suite("txn_insert") { order_qt_select_cu3 """select * from ${unique_table}_3""" } } + + def db_name = "regression_test_insert_p0" + def tables = sql """ show tables from $db_name """ + logger.info("tables: $tables") + for (def table_info : tables) { + def table_name = table_info[0] + if (table_name.startsWith("txn_insert_")) { + check_table_version_continuous(db_name, table_name) + } + } } diff --git a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy index 60cffc4d0df..3e84a9f56e8 100644 --- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy @@ -83,14 +83,20 @@ suite("txn_insert_concurrent_insert") { def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") + def sqls = [ + "begin", + "insert into ${tableName}_0 select * from ${tableName}_1 where L_ORDERKEY < 30000;", + "insert into ${tableName}_1 select * from ${tableName}_2 where L_ORDERKEY > 500000;", + "insert into ${tableName}_0 select * from ${tableName}_2 where L_ORDERKEY < 30000;", + "commit" + ] def txn_insert = { -> try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); Statement stmt = conn.createStatement()) { - stmt.execute("begin") - stmt.execute("insert into ${tableName}_0 select * from ${tableName}_1 where L_ORDERKEY < 30000;") - stmt.execute("insert into ${tableName}_1 select * from ${tableName}_2 where L_ORDERKEY > 500000;") - stmt.execute("insert into ${tableName}_0 select * from ${tableName}_2 where L_ORDERKEY < 30000;") - stmt.execute("commit") + for (def sql : sqls) { + logger.info(Thread.currentThread().getName() + " execute sql: " + sql) + stmt.execute(sql) + } logger.info("finish txn insert for " + Thread.currentThread().getName()) } catch (Throwable e) { logger.error("txn insert failed", e) @@ -112,4 +118,14 @@ suite("txn_insert_concurrent_insert") { result = sql """ select count() from ${tableName}_1 """ logger.info("result: ${result}") assertEquals(2606192, result[0][0]) + + def db_name = "regression_test_insert_p0" + def tables = sql """ show tables from $db_name """ + logger.info("tables: $tables") + for (def table_info : tables) { + def table_name = table_info[0] + if (table_name.startsWith(tableName)) { + check_table_version_continuous(db_name, table_name) + } + } } diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy index e22c38c70af..083f01b4a8a 100644 --- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy +++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy @@ -19,10 +19,12 @@ import com.mysql.cj.jdbc.StatementImpl import java.sql.Connection import java.sql.DriverManager import java.sql.Statement +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType suite("txn_insert_inject_case", "nonConcurrent") { + // test load fail def table = "txn_insert_inject_case" - for (int j = 0; j < 3; j++) { def tableName = table + "_" + j sql """ DROP TABLE IF EXISTS $tableName """ @@ -37,9 +39,74 @@ suite("txn_insert_inject_case", "nonConcurrent") { properties("replication_num" = "1"); """ } + GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error") sql """insert into ${table}_1 values(1, 2.2, "abc", [], []), (2, 3.3, "xyz", [1], [1, 0]), (null, null, null, [null], [null, 0]) """ sql """insert into ${table}_2 values(3, 2.2, "abc", [], []), (4, 3.3, "xyz", [1], [1, 0]), (null, null, null, [null], [null, 0]) """ + def ipList = [:] + def portList = [:] + (ipList, portList) = GetDebugPoint().getBEHostAndHTTPPort() + logger.info("be ips: ${ipList}, ports: ${portList}") + + def enableDebugPoint = { -> + ipList.each { beid, ip -> + DebugPoint.enableDebugPoint(ip, portList[beid] as int, NodeType.BE, "FlushToken.submit_flush_error") + } + } + + def disableDebugPoint = { -> + ipList.each { beid, ip -> + DebugPoint.disableDebugPoint(ip, portList[beid] as int, NodeType.BE, "FlushToken.submit_flush_error") + } + } + + try { + enableDebugPoint() + sql """ begin """ + try { + sql """ insert into ${table}_0 select * from ${table}_1; """ + assertTrue(false, "insert should fail") + } catch (Exception e) { + logger.info("1" + e.getMessage()) + assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error")) + } + try { + sql """ insert into ${table}_0 select * from ${table}_1; """ + assertTrue(false, "insert should fail") + } catch (Exception e) { + logger.info("2" + e.getMessage()) + assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error")) + } + + disableDebugPoint() + sql """ insert into ${table}_0 select * from ${table}_1; """ + + enableDebugPoint() + try { + sql """ insert into ${table}_0 select * from ${table}_1; """ + assertTrue(false, "insert should fail") + } catch (Exception e) { + logger.info("4" + e.getMessage()) + assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error")) + } + + disableDebugPoint() + sql """ insert into ${table}_0 select * from ${table}_1; """ + sql """ commit""" + } catch (Exception e) { + logger.error("failed", e) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error") + } + sql "sync" + order_qt_select1 """select * from ${table}_0""" + + if (isCloudMode()) { + return + } + + sql """ truncate table ${table}_0 """ + // 1. publish timeout def backendId_to_params = get_be_param("pending_data_expire_time_sec") try { diff --git a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy index 388342bad53..7481a52e940 100644 --- a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy +++ b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy @@ -30,13 +30,33 @@ suite("txn_insert_with_schema_change") { CountDownLatch insertLatch = new CountDownLatch(1) CountDownLatch schemaChangeLatch = new CountDownLatch(1) - def getAlterTableState = { job_state -> + for (int j = 0; j < 5; j++) { + def tableName = table + "_" + j + sql """ DROP TABLE IF EXISTS $tableName """ + sql """ + create table $tableName ( + `ID` int(11) NOT NULL, + `NAME` varchar(100) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + duplicate KEY(`id`) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + } + sql """ insert into ${table}_0 values(0, '0', 0) """ + sql """ insert into ${table}_1 values(0, '0', 0) """ + sql """ insert into ${table}_2 values(0, '0', 0) """ + sql """ insert into ${table}_3 values(1, '1', 1), (2, '2', 2) """ + sql """ insert into ${table}_4 values(3, '3', 3), (4, '4', 4), (5, '5', 5) """ + + def getAlterTableState = { tName, job_state -> def retry = 0 sql "use ${dbName};" def last_state = "" while (true) { sleep(2000) - def state = sql " show alter table column where tablename = '${table}_0' order by CreateTime desc limit 1" + def state = sql """ show alter table column where tablename = "${tName}" order by CreateTime desc limit 1""" logger.info("alter table state: ${state}") last_state = state[0][9] if (state.size() > 0 && last_state == job_state) { @@ -53,13 +73,17 @@ suite("txn_insert_with_schema_change") { def txnInsert = { sqls -> try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); Statement statement = conn.createStatement()) { + logger.info("execute sql: begin") statement.execute("begin") + logger.info("execute sql: ${sqls[0]}") statement.execute(sqls[0]) schemaChangeLatch.countDown() - insertLatch.await(2, TimeUnit.MINUTES) + insertLatch.await(5, TimeUnit.MINUTES) + logger.info("execute sql: ${sqls[1]}") statement.execute(sqls[1]) + logger.info("execute sql: commit") statement.execute("commit") } catch (Throwable e) { logger.error("txn insert failed", e) @@ -67,13 +91,14 @@ suite("txn_insert_with_schema_change") { } } - def schemaChange = { sql, job_state -> + def schemaChange = { sql, tName, job_state -> try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); Statement statement = conn.createStatement()) { - schemaChangeLatch.await(2, TimeUnit.MINUTES) + schemaChangeLatch.await(5, TimeUnit.MINUTES) + logger.info("execute sql: ${sql}") statement.execute(sql) if (job_state != null) { - getAlterTableState(job_state) + getAlterTableState(tName, job_state) } insertLatch.countDown() } catch (Throwable e) { @@ -83,32 +108,20 @@ suite("txn_insert_with_schema_change") { } def sqls = [ - ["insert into ${table}_0(id, name, score) select * from ${table}_1;", - "insert into ${table}_0(id, name, score) select * from ${table}_2;"], - ["delete from ${table}_0 where id = 0 or id = 3;", - "insert into ${table}_0(id, name, score) select * from ${table}_2;"], - ["insert into ${table}_0(id, name, score) select * from ${table}_2;", - "delete from ${table}_0 where id = 0 or id = 3;"] + ["insert into ${table}_0(id, name, score) select * from ${table}_3;", + "insert into ${table}_0(id, name, score) select * from ${table}_4;"], + ["delete from ${table}_1 where id = 0 or id = 3;", + "insert into ${table}_1(id, name, score) select * from ${table}_4;"], + ["insert into ${table}_2(id, name, score) select * from ${table}_4;", + "delete from ${table}_2 where id = 0 or id = 3;"] ] - for (def insert_sqls: sqls) { - for (int j = 0; j < 3; j++) { - def tableName = table + "_" + j - sql """ DROP TABLE IF EXISTS $tableName force """ - sql """ - create table $tableName ( - `ID` int(11) NOT NULL, - `NAME` varchar(100) NULL, - `score` int(11) NULL - ) ENGINE=OLAP - duplicate KEY(`id`) - distributed by hash(id) buckets 1 - properties("replication_num" = "1"); - """ + for (int i = 0; i < sqls.size(); i++) { + def insert_sqls = sqls[i] + // TODO skip because it will cause ms core + if (isCloudMode() && insert_sqls[1].startsWith("delete")) { + continue } - sql """ insert into ${table}_0 values(0, '0', 0) """ - sql """ insert into ${table}_1 values(1, '1', 1), (2, '2', 2) """ - sql """ insert into ${table}_2 values(3, '3', 3), (4, '4', 4), (5, '5', 5) """ logger.info("insert sqls: ${insert_sqls}") // 1. do light weight schema change: add column @@ -116,7 +129,7 @@ suite("txn_insert_with_schema_change") { insertLatch = new CountDownLatch(1) schemaChangeLatch = new CountDownLatch(1) Thread insert_thread = new Thread(() -> txnInsert(insert_sqls)) - Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table}_0 ADD column age int after name;", null)) + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table}_${i} ADD column age int after name;", "${table}_${i}", null)) insert_thread.start() schema_change_thread.start() insert_thread.join() @@ -124,9 +137,9 @@ suite("txn_insert_with_schema_change") { logger.info("errors: " + errors) assertEquals(0, errors.size()) - order_qt_select1 """select id, name, score from ${table}_0 """ - getAlterTableState("FINISHED") - order_qt_select2 """select id, name, score from ${table}_0 """ + order_qt_select1 """select id, name, score from ${table}_${i} """ + getAlterTableState("${table}_${i}", "FINISHED") + order_qt_select2 """select id, name, score from ${table}_${i} """ } // 2. do hard weight schema change: change order @@ -134,7 +147,7 @@ suite("txn_insert_with_schema_change") { insertLatch = new CountDownLatch(1) schemaChangeLatch = new CountDownLatch(1) Thread insert_thread = new Thread(() -> txnInsert(insert_sqls)) - Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table}_0 order by (id, score, age, name);", "WAITING_TXN")) + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table}_${i} order by (id, score, age, name);", "${table}_${i}", "WAITING_TXN")) insert_thread.start() schema_change_thread.start() insert_thread.join() @@ -142,9 +155,10 @@ suite("txn_insert_with_schema_change") { logger.info("errors: " + errors) assertEquals(0, errors.size()) - order_qt_select3 """select id, name, score from ${table}_0 """ - getAlterTableState("FINISHED") - order_qt_select4 """select id, name, score from ${table}_0 """ + order_qt_select3 """select id, name, score from ${table}_${i} """ + getAlterTableState("${table}_${i}", "FINISHED") + order_qt_select4 """select id, name, score from ${table}_${i} """ } + check_table_version_continuous(dbName, table + "_" + i) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org