This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7b70b7223c7 branch-3.0: [fix](path gc) Fix path gc race with publish
task #50343 (#50487)
7b70b7223c7 is described below
commit 7b70b7223c7f08c27e55b906fedec3ba88501153
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Apr 28 19:19:36 2025 +0800
branch-3.0: [fix](path gc) Fix path gc race with publish task #50343
(#50487)
Cherry-picked from #50343
Co-authored-by: deardeng <[email protected]>
---
be/src/olap/data_dir.cpp | 8 ++
be/src/olap/olap_server.cpp | 13 +++-
be/src/olap/tablet.cpp | 1 -
be/src/olap/task/engine_publish_version_task.cpp | 87 +++++++++++++---------
be/src/olap/txn_manager.cpp | 10 ++-
be/src/olap/txn_manager.h | 6 +-
be/test/olap/delta_writer_cluster_key_test.cpp | 8 +-
be/test/olap/delta_writer_test.cpp | 23 ++++--
.../olap/engine_storage_migration_task_test.cpp | 7 +-
be/test/olap/segment_cache_test.cpp | 8 +-
be/test/olap/tablet_cooldown_test.cpp | 7 +-
be/test/olap/txn_manager_test.cpp | 14 +++-
be/test/runtime/snapshot_loader_test.cpp | 4 +-
.../test_path_gc_with_publish_version.groovy | 77 +++++++++++++++++++
14 files changed, 204 insertions(+), 69 deletions(-)
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 7469495f176..5cb2a6105d9 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -754,6 +754,14 @@ void DataDir::_perform_rowset_gc(const std::string&
tablet_schema_hash_path) {
[&rowsets_in_version_map](auto& rs) {
rowsets_in_version_map.insert(rs->rowset_id()); },
true);
+ DBUG_EXECUTE_IF("DataDir::_perform_rowset_gc.simulation.slow", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == tablet_id) {
+ LOG(INFO) << "debug point wait tablet to remove rsmgr tabletId="
<< tablet_id;
+ DBUG_BLOCK;
+ }
+ });
+
auto reclaim_rowset_file = [](const std::string& path) {
auto st = io::global_local_filesystem()->delete_file(path);
if (!st.ok()) [[unlikely]] {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 42333b560f0..070944a7cef 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -465,9 +465,17 @@ void StorageEngine::_path_gc_thread_callback(DataDir*
data_dir) {
int32_t current_time = time(nullptr);
int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
+ DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", {
+ LOG(INFO) << "debug point change interval eq 1ms";
+ interval = 1;
+ while
(DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) {
+ data_dir->perform_path_gc();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ });
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:"
<< interval
- << "will be forced set to half hour";
+ << " will be forced set to half hour";
interval = 1800; // 0.5 hour
}
if (current_time - last_exec_time >= interval) {
@@ -483,8 +491,9 @@ void StorageEngine::_path_gc_thread_callback(DataDir*
data_dir) {
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>&
data_dirs) {
int64_t interval =
config::generate_tablet_meta_checkpoint_tasks_interval_secs;
do {
- LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
for (auto data_dir : data_dirs) {
+ LOG(INFO) << "begin to produce tablet meta checkpoint tasks,
data_dir="
+ << data_dir->path();
auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
[data_dir, this]() {
_tablet_manager->do_tablet_meta_checkpoint(data_dir); });
if (!st.ok()) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5cce7bb652b..958dcb8ce13 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1446,7 +1446,6 @@ bool Tablet::do_tablet_meta_checkpoint() {
_newly_created_rowset_num <
config::tablet_meta_checkpoint_min_new_rowsets_num) {
return false;
}
-
// hold read-lock other than write-lock, because it will not modify meta
structure
std::shared_lock rdlock(_meta_lock);
if (tablet_state() != TABLET_RUNNING) {
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 49784cd9e52..2dcc1723b71 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -396,6 +396,49 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine&
engine,
TabletPublishTxnTask::~TabletPublishTxnTask() = default;
+Status publish_version_and_add_rowset(StorageEngine& engine, int64_t
partition_id,
+ const TabletSharedPtr& tablet, const
RowsetSharedPtr& rowset,
+ int64_t transaction_id, const Version&
version,
+ EnginePublishVersionTask*
engine_publish_version_task,
+ TabletPublishStatistics& stats) {
+ // ATTN: Here, the life cycle needs to be extended to prevent
tablet_txn_info.pending_rs_guard in txn
+ // from being released prematurely, causing path gc to mistakenly delete
the dat file
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
+
+ // Publish the transaction
+ auto result = engine.txn_manager()->publish_txn(partition_id, tablet,
transaction_id, version,
+ &stats,
extend_tablet_txn_info_lifetime);
+ if (!result.ok()) {
+ LOG(WARNING) << "failed to publish version. rowset_id=" <<
rowset->rowset_id()
+ << ", tablet_id=" << tablet->tablet_id() << ", txn_id="
<< transaction_id
+ << ", res=" << result;
+ if (engine_publish_version_task) {
+
engine_publish_version_task->add_error_tablet_id(tablet->tablet_id());
+ }
+ return result;
+ }
+
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets",
DBUG_BLOCK);
+
+ // Add visible rowset to tablet
+ int64_t start_time = MonotonicMicros();
+ result = tablet->add_inc_rowset(rowset);
+
DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block",
+ DBUG_BLOCK);
+ stats.add_inc_rowset_us = MonotonicMicros() - start_time;
+ if (!result.ok() && !result.is<PUSH_VERSION_ALREADY_EXIST>()) {
+ LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" <<
rowset->rowset_id()
+ << ", tablet_id=" << tablet->tablet_id() << ", txn_id="
<< transaction_id
+ << ", res=" << result;
+ if (engine_publish_version_task) {
+
engine_publish_version_task->add_error_tablet_id(tablet->tablet_id());
+ }
+ return result;
+ }
+
+ return result;
+}
+
void TabletPublishTxnTask::handle() {
std::shared_lock migration_rlock(_tablet->get_migration_lock(),
std::chrono::seconds(5));
SCOPED_ATTACH_TASK(_mem_tracker);
@@ -411,29 +454,14 @@ void TabletPublishTxnTask::handle() {
rowset_update_lock.lock();
}
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
- _result = _engine.txn_manager()->publish_txn(_partition_id, _tablet,
_transaction_id, _version,
- &_stats);
+ _result = publish_version_and_add_rowset(_engine, _partition_id, _tablet,
_rowset,
+ _transaction_id, _version,
+ _engine_publish_version_task,
_stats);
+
if (!_result.ok()) {
- LOG(WARNING) << "failed to publish version. rowset_id=" <<
_rowset->rowset_id()
- << ", tablet_id=" << _tablet_info.tablet_id << ",
txn_id=" << _transaction_id
- << ", res=" << _result;
-
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
- DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets",
DBUG_BLOCK);
-
- // add visible rowset to tablet
- int64_t t1 = MonotonicMicros();
- _result = _tablet->add_inc_rowset(_rowset);
- _stats.add_inc_rowset_us = MonotonicMicros() - t1;
- if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
- LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" <<
_rowset->rowset_id()
- << ", tablet_id=" << _tablet_info.tablet_id << ",
txn_id=" << _transaction_id
- << ", res=" << _result;
-
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
- return;
- }
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
g_tablet_publish_latency << cost_us;
_stats.record_in_bvar();
@@ -466,27 +494,14 @@ void AsyncTabletPublishTask::handle() {
}
RowsetSharedPtr rowset = iter->second;
Version version(_version, _version);
- auto publish_status = _engine.txn_manager()->publish_txn(_partition_id,
_tablet,
- _transaction_id,
version, &_stats);
- if (!publish_status.ok()) {
- LOG(WARNING) << "failed to publish version. rowset_id=" <<
rowset->rowset_id()
- << ", tablet_id=" << _tablet->tablet_id() << ", txn_id="
<< _transaction_id
- << ", res=" << publish_status;
- return;
- }
- DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets",
DBUG_BLOCK);
+ auto publish_status = publish_version_and_add_rowset(_engine,
_partition_id, _tablet, rowset,
+ _transaction_id,
version, nullptr, _stats);
- // add visible rowset to tablet
- int64_t t1 = MonotonicMicros();
- publish_status = _tablet->add_inc_rowset(rowset);
- _stats.add_inc_rowset_us = MonotonicMicros() - t1;
- if (!publish_status.ok() &&
!publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
- LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" <<
rowset->rowset_id()
- << ", tablet_id=" << _tablet->tablet_id() << ", txn_id="
<< _transaction_id
- << ", res=" << publish_status;
+ if (!publish_status.ok()) {
return;
}
+
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
// print stats if publish cost > 500ms
g_tablet_publish_latency << cost_us;
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index eb4b52b33b1..0973511080f 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -200,9 +200,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id,
const Tablet& tablet,
Status TxnManager::publish_txn(TPartitionId partition_id, const
TabletSharedPtr& tablet,
TTransactionId transaction_id, const Version&
version,
- TabletPublishStatistics* stats) {
+ TabletPublishStatistics* stats,
+ std::shared_ptr<TabletTxnInfo>&
extend_tablet_txn_info) {
return publish_txn(tablet->data_dir()->get_meta(), partition_id,
transaction_id,
- tablet->tablet_id(), tablet->tablet_uid(), version,
stats);
+ tablet->tablet_id(), tablet->tablet_uid(), version,
stats,
+ extend_tablet_txn_info);
}
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId
transaction_id,
@@ -457,7 +459,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id, TTabletId
tablet_id,
TabletUid tablet_uid, const Version& version,
- TabletPublishStatistics* stats) {
+ TabletPublishStatistics* stats,
+ std::shared_ptr<TabletTxnInfo>&
extend_tablet_txn_info) {
auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::OK();
@@ -483,6 +486,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId
partition_id,
// found load for txn,tablet
// case 1: user commit rowset, then the load id must be equal
tablet_txn_info = txn_info_iter->second;
+ extend_tablet_txn_info = tablet_txn_info;
rowset = tablet_txn_info->rowset;
}
}
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 4d96f8c3acf..95e9b295055 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -166,7 +166,8 @@ public:
Status publish_txn(TPartitionId partition_id, const TabletSharedPtr&
tablet,
TTransactionId transaction_id, const Version& version,
- TabletPublishStatistics* stats);
+ TabletPublishStatistics* stats,
+ std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
// delete the txn from manager if it is not committed(not have a valid
rowset)
Status rollback_txn(TPartitionId partition_id, const Tablet& tablet,
@@ -184,7 +185,8 @@ public:
// not persist rowset meta because
Status publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id,
TTabletId tablet_id, TabletUid tablet_uid, const
Version& version,
- TabletPublishStatistics* stats);
+ TabletPublishStatistics* stats,
+ std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
// only abort not committed txn
void abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id,
diff --git a/be/test/olap/delta_writer_cluster_key_test.cpp
b/be/test/olap/delta_writer_cluster_key_test.cpp
index 9c3e64109c4..cc9195c4fee 100644
--- a/be/test/olap/delta_writer_cluster_key_test.cpp
+++ b/be/test/olap/delta_writer_cluster_key_test.cpp
@@ -309,9 +309,11 @@ TEST_F(TestDeltaWriterClusterKey, vec_sequence_col) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
TabletPublishStatistics pstats;
- res = engine_ref->txn_manager()->publish_txn(
- meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
+ res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id,
write_req.txn_id,
+ write_req.tablet_id,
+
tablet_related_rs.begin()->first.tablet_uid,
+ version, &pstats,
extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index e870e3ad6f2..7f6aadd6070 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -669,9 +669,10 @@ TEST_F(TestDeltaWriter, vec_write) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_rs.second;
TabletPublishStatistics stats;
- res = engine_ref->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
- write_req.tablet_id,
-
tablet_rs.first.tablet_uid, version, &stats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
+ res = engine_ref->txn_manager()->publish_txn(
+ meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
+ tablet_rs.first.tablet_uid, version, &stats,
extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
@@ -763,9 +764,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
TabletPublishStatistics pstats;
- res = engine_ref->txn_manager()->publish_txn(
- meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
+ res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id,
write_req.txn_id,
+ write_req.tablet_id,
+
tablet_related_rs.begin()->first.tablet_uid,
+ version, &pstats,
extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
@@ -911,9 +914,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write)
{
std::cout << "start to publish txn" << std::endl;
rowset1 = tablet_related_rs.begin()->second;
TabletPublishStatistics pstats;
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
res = engine_ref->txn_manager()->publish_txn(
meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
+ tablet_related_rs.begin()->first.tablet_uid, version, &pstats,
+ extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset1->rowset_id()
<< ", num rows:" << rowset1->num_rows()
@@ -964,9 +969,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write)
{
ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1));
TabletPublishStatistics pstats;
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
res = engine_ref->txn_manager()->publish_txn(
meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
+ tablet_related_rs.begin()->first.tablet_uid, version, &pstats,
+ extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset2->rowset_id()
<< ", num rows:" << rowset2->num_rows()
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index 6d87413e5d3..20d58fc3e30 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -226,9 +226,10 @@ TEST_F(TestEngineStorageMigrationTask,
write_and_migration) {
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
TabletPublishStatistics stats;
- res = engine_ref->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
- tablet->tablet_id(),
tablet->tablet_uid(),
- version, &stats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
+ res = engine_ref->txn_manager()->publish_txn(
+ meta, write_req.partition_id, write_req.txn_id,
tablet->tablet_id(),
+ tablet->tablet_uid(), version, &stats,
extend_tablet_txn_info_lifetime);
EXPECT_EQ(Status::OK(), res);
res = tablet->add_inc_rowset(rowset);
EXPECT_EQ(Status::OK(), res);
diff --git a/be/test/olap/segment_cache_test.cpp
b/be/test/olap/segment_cache_test.cpp
index c527ffddd42..3002f0f7829 100644
--- a/be/test/olap/segment_cache_test.cpp
+++ b/be/test/olap/segment_cache_test.cpp
@@ -293,9 +293,11 @@ TEST_F(SegmentCacheTest, vec_sequence_col) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
TabletPublishStatistics pstats;
- res = engine_ref->txn_manager()->publish_txn(
- meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
+ res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id,
write_req.txn_id,
+ write_req.tablet_id,
+
tablet_related_rs.begin()->first.tablet_uid,
+ version, &pstats,
extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 8826cec7e9c..fbcbb443131 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -376,9 +376,10 @@ static void write_rowset(TabletSharedPtr* tablet,
PUniqueId load_id, int64_t rep
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
TabletPublishStatistics stats;
- st = engine_ref->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
- (*tablet)->tablet_id(),
(*tablet)->tablet_uid(),
- version, &stats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
+ st = engine_ref->txn_manager()->publish_txn(
+ meta, write_req.partition_id, write_req.txn_id,
(*tablet)->tablet_id(),
+ (*tablet)->tablet_uid(), version, &stats,
extend_tablet_txn_info_lifetime);
ASSERT_EQ(Status::OK(), st);
st = (*tablet)->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/txn_manager_test.cpp
b/be/test/olap/txn_manager_test.cpp
index c7926b771db..eac5ff46d5a 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -326,9 +326,13 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
ASSERT_TRUE(st.ok()) << st;
Version new_version(10, 11);
TabletPublishStatistics stats;
- st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id,
transaction_id, tablet_id,
- _tablet_uid, new_version,
&stats);
- ASSERT_TRUE(st.ok()) << st;
+ {
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime =
nullptr;
+ st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id,
transaction_id,
+ tablet_id, _tablet_uid,
new_version, &stats,
+
extend_tablet_txn_info_lifetime);
+ ASSERT_TRUE(st.ok()) << st;
+ }
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
st = RowsetMetaManager::get_rowset_meta(_meta.get(), _tablet_uid,
_rowset->rowset_id(),
@@ -345,8 +349,10 @@ TEST_F(TxnManagerTest, PublishNotExistedTxn) {
Version new_version(10, 11);
auto not_exist_txn = transaction_id + 1000;
TabletPublishStatistics stats;
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
auto st = k_engine->txn_manager()->publish_txn(_meta.get(), partition_id,
not_exist_txn,
- tablet_id, _tablet_uid,
new_version, &stats);
+ tablet_id, _tablet_uid,
new_version, &stats,
+
extend_tablet_txn_info_lifetime);
ASSERT_FALSE(st.ok()) << st;
}
diff --git a/be/test/runtime/snapshot_loader_test.cpp
b/be/test/runtime/snapshot_loader_test.cpp
index 36af0f5d70d..6aba0e6fe18 100644
--- a/be/test/runtime/snapshot_loader_test.cpp
+++ b/be/test/runtime/snapshot_loader_test.cpp
@@ -248,7 +248,9 @@ static void add_rowset(int64_t tablet_id, int32_t
schema_hash, int64_t partition
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
TabletPublishStatistics stats;
- res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id,
version, &stats);
+ std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
+ res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id,
version, &stats,
+
extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok()) << res;
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
diff --git
a/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy
b/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy
new file mode 100644
index 00000000000..149c2a20177
--- /dev/null
+++ b/regression-test/suites/path_gc_p0/test_path_gc_with_publish_version.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_path_gc_with_publish_version', 'docker') {
+ if (isCloudMode()) {
+ return
+ }
+
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.beConfigs += [
+ 'path_gc_check=true',
+ 'path_gc_check_interval_second=1',
+ 'path_gc_check_step=0',
+ 'generate_tablet_meta_checkpoint_tasks_interval_secs=1',
+ 'tablet_meta_checkpoint_min_new_rowsets_num=1',
+ 'sys_log_verbose_modules=*',
+ ]
+ options.feNum = 1
+ options.beNum = 1
+
+ docker(options) {
+ def be1 = cluster.getBeByIndex(1)
+ be1.enableDebugPoint('_path_gc_thread_callback.interval.eq.1ms', null)
+ be1.enableDebugPoint('_path_gc_thread_callback.always.do', null)
+
+ sql "SET GLOBAL insert_visible_timeout_ms = 5000"
+ // wait path gc interval time to 1ms
+ Thread.sleep(1000)
+
+ sql """
+ CREATE TABLE tbl (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS
1 PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1")
+ """
+
+ def result = sql_return_maparray """show tablets from tbl"""
+ log.info("show tablet result {}", result)
+ Long tabletId = result.TabletId[0] as Long
+
+
be1.enableDebugPoint('EnginePublishVersionTask.handle.block_add_rowsets', null)
+
be1.enableDebugPoint('EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block',
null)
+ sql 'INSERT INTO tbl VALUES (1, 10)'
+ // Rs not in pending
+
+ be1.enableDebugPoint('DataDir::_perform_rowset_gc.simulation.slow',
[tablet_id: tabletId])
+ Thread.sleep(5000)
+
+
be1.disableDebugPoint('EnginePublishVersionTask.handle.block_add_rowsets')
+ Thread.sleep(5000)
+ // publish continue
+ // checkpoint clean Rs manager
+ // path gc continue
+ be1.disableDebugPoint('DataDir::_perform_rowset_gc.simulation.slow')
+
be1.disableDebugPoint('EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block')
+ Thread.sleep(3 * 1000)
+
+ result = sql """select * from tbl"""
+ log.info("result = {}", result)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]