This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff7cdac759b [improve](group-commit) Modify some log (#30842)
ff7cdac759b is described below
commit ff7cdac759baeed1085eddbd56136adee013d7bd
Author: meiyi <[email protected]>
AuthorDate: Tue Feb 6 13:52:20 2024 +0800
[improve](group-commit) Modify some log (#30842)
---
be/src/olap/wal/wal_dirs_info.cpp | 1 -
be/src/olap/wal/wal_manager.cpp | 24 ++++-----
be/src/olap/wal/wal_reader.cpp | 2 +-
be/src/olap/wal/wal_table.cpp | 59 ++++++++--------------
be/src/olap/wal/wal_writer.cpp | 2 +-
be/src/runtime/group_commit_mgr.cpp | 6 +--
.../insert_p0/insert_group_commit_into_unique.out | 12 ++---
.../insert_group_commit_into_unique_sync_mode.out | 12 ++---
.../data/insert_p0/test_group_commit_2.csv | 2 +-
.../data/insert_p0/test_group_commit_4.csv | 2 +-
10 files changed, 50 insertions(+), 72 deletions(-)
diff --git a/be/src/olap/wal/wal_dirs_info.cpp
b/be/src/olap/wal/wal_dirs_info.cpp
index 4e54a543980..c1ca6fce2fe 100644
--- a/be/src/olap/wal/wal_dirs_info.cpp
+++ b/be/src/olap/wal/wal_dirs_info.cpp
@@ -173,7 +173,6 @@ std::string WalDirsInfo::get_wal_dirs_info_string() {
Status WalDirsInfo::update_wal_dir_limit(const std::string& wal_dir, size_t
limit) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
- LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_limit(limit);
}
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 511751693c8..601a9bc5b4a 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -110,7 +110,7 @@ Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
std::string tmp_dir = wal_dir + "/" + _tmp;
- LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
+ LOG(INFO) << "wal_dir:" << wal_dir << ", tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir,
&exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir));
@@ -162,8 +162,7 @@ Status WalManager::_init_wal_dirs_info() {
void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) {
std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
- LOG(INFO) << "add wal queue "
- << ",table_id:" << table_id << ",wal_id:" << wal_id;
+ LOG(INFO) << "add wal to queue, table_id: " << table_id << ", wal_id: " <<
wal_id;
auto it = _wal_queues.find(table_id);
if (it == _wal_queues.end()) {
std::set<int64_t> tmp_set;
@@ -178,8 +177,7 @@ void WalManager::erase_wal_queue(int64_t table_id, int64_t
wal_id) {
std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
auto it = _wal_queues.find(table_id);
if (it != _wal_queues.end()) {
- LOG(INFO) << "remove wal queue "
- << ",table_id:" << table_id << ",wal_id:" << wal_id;
+ LOG(INFO) << "remove wal from queue, table_id: " << table_id << ",
wal_id: " << wal_id;
it->second.erase(wal_id);
if (it->second.empty()) {
_wal_queues.erase(table_id);
@@ -240,7 +238,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
std::vector<io::FileInfo> dbs;
Status st = io::global_local_filesystem()->list(wal_path, false, &dbs,
&exists);
if (!st.ok()) {
- LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" <<
st.to_string();
+ LOG(WARNING) << "failed list files for wal_dir=" << wal_path << ",
st=" << st.to_string();
return st;
}
for (const auto& database_id : dbs) {
@@ -251,7 +249,8 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
auto db_path = wal_path + "/" + database_id.file_name;
st = io::global_local_filesystem()->list(db_path, false, &tables,
&exists);
if (!st.ok()) {
- LOG(WARNING) << "Failed list files for dir=" << db_path << ", st="
<< st.to_string();
+ LOG(WARNING) << "failed to list files for wal_dir=" << db_path
+ << ", st=" << st.to_string();
return st;
}
for (const auto& table_id : tables) {
@@ -262,7 +261,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
auto table_path = db_path + "/" + table_id.file_name;
st = io::global_local_filesystem()->list(table_path, false, &wals,
&exists);
if (!st.ok()) {
- LOG(WARNING) << "Failed list files for dir=" << table_path
+ LOG(WARNING) << "failed to list files for wal_dir=" <<
table_path
<< ", st=" << st.to_string();
return st;
}
@@ -300,7 +299,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
count += res.size();
}
}
- LOG(INFO) << "Finish list all wals, size:" << count;
+ LOG(INFO) << "Finish list wal_dir=" << wal_path << ", wal count=" << count;
return Status::OK();
}
@@ -329,7 +328,7 @@ Status WalManager::_replay() {
RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] {
auto st = this->_table_map[table_id]->replay_wals();
if (!st.ok()) {
- LOG(WARNING) << "Failed add replay wal on table " <<
table_id;
+ LOG(WARNING) << "failed to submit replay wal for table="
<< table_id;
}
}));
}
@@ -497,9 +496,9 @@ Status WalManager::delete_wal(int64_t table_id, int64_t
wal_id, size_t block_que
wal_path = it->second;
auto st = io::global_local_filesystem()->delete_file(wal_path);
if (st.ok()) {
- LOG(INFO) << "delete file=" << wal_path;
+ LOG(INFO) << "delete wal=" << wal_path;
} else {
- LOG(WARNING) << "fail to delete file=" << wal_path;
+ LOG(WARNING) << "failed to delete wal=" << wal_path << ", st="
<< st.to_string();
}
_wal_path_map.erase(wal_id);
}
@@ -531,6 +530,7 @@ Status WalManager::rename_to_tmp_path(const std::string
wal, int64_t table_id, i
}
auto res = std::rename(wal.c_str(), wal_path.string().c_str());
if (res < 0) {
+ LOG(INFO) << "failed to rename wal from " << wal << " to " <<
wal_path.string();
return Status::InternalError("rename fail on path " + wal);
}
LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index 5b65467b6c9..bac073e3034 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -46,7 +46,7 @@ Status WalReader::init() {
Status WalReader::finalize() {
auto st = file_reader->close();
if (!st.ok()) {
- LOG(WARNING) << "fail to close file " << _file_name;
+ LOG(WARNING) << "fail to close wal " << _file_name;
}
return Status::OK();
}
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 3a31ebcf9c0..dabbf0596e9 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -49,7 +49,7 @@ Status k_stream_load_exec_status;
void WalTable::add_wal(int64_t wal_id, std::string wal) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
- LOG(INFO) << "add replay wal " << wal;
+ LOG(INFO) << "add replay wal=" << wal;
auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
_replay_wal_map.emplace(wal, wal_info);
}
@@ -60,14 +60,10 @@ void WalTable::_pick_relay_wals() {
std::vector<std::string> need_erase_wals;
for (const auto& [wal_path, wal_info] : _replay_wal_map) {
if (wal_info->get_retry_num() >=
config::group_commit_replay_wal_retry_num) {
- LOG(WARNING) << "All replay wal failed, db=" << _db_id << ",
table=" << _table_id
- << ", wal=" << wal_path << ", retry_num=" <<
wal_info->get_retry_num();
- auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path,
_table_id,
-
wal_info->get_wal_id());
- if (!st.ok()) {
- LOG(WARNING) << "rename " << wal_path << " fail"
- << ",st:" << st.to_string();
- }
+ LOG(WARNING) << "failed to replay wal=" << wal_path << " after
retry "
+ << wal_info->get_retry_num() << " times";
+ [[maybe_unused]] auto st =
_exec_env->wal_mgr()->rename_to_tmp_path(
+ wal_path, _table_id, wal_info->get_wal_id());
if (config::group_commit_wait_replay_wal_finish) {
auto notify_st =
_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
if (!notify_st.ok()) {
@@ -104,18 +100,18 @@ Status WalTable::_relay_wal_one_by_one() {
wal_info->add_retry_num();
auto st = _replay_wal_internal(wal_info->get_wal_path());
if (!st.ok()) {
- LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table="
<< _table_id
- << ", wal=" << wal_info->get_wal_path() << ", st=" <<
st.to_string();
+ LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
+ << ", st=" << st.to_string();
if (!st.is<ErrorCode::NOT_FOUND>()) {
need_retry_wals.push_back(wal_info);
} else {
need_delete_wals.push_back(wal_info);
}
} else {
+ LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
+ << ", st=" << st.to_string();
need_delete_wals.push_back(wal_info);
}
- VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
- << ", wal=" << wal_info->get_wal_path() << ", st=" <<
st.to_string();
}
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
@@ -124,10 +120,8 @@ Status WalTable::_relay_wal_one_by_one() {
}
}
for (auto delete_wal_info : need_delete_wals) {
- auto st = _exec_env->wal_mgr()->delete_wal(_table_id,
delete_wal_info->get_wal_id());
- if (!st.ok()) {
- LOG(WARNING) << "fail to delete wal " <<
delete_wal_info->get_wal_path();
- }
+ [[maybe_unused]] auto st =
+ _exec_env->wal_mgr()->delete_wal(_table_id,
delete_wal_info->get_wal_id());
if (config::group_commit_wait_replay_wal_finish) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id()));
}
@@ -139,11 +133,11 @@ Status WalTable::replay_wals() {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.empty()) {
- LOG(INFO) << "_replay_wal_map is empty,skip relaying";
+ LOG(INFO) << "_replay_wal_map is empty, skip relaying for
table_id=" << _table_id;
return Status::OK();
}
if (!_replaying_queue.empty()) {
- LOG(INFO) << "_replaying_queue is not empty,skip relaying";
+ LOG(INFO) << "_replaying_queue is not empty, skip relaying for
table_id=" << _table_id;
return Status::OK();
}
}
@@ -171,7 +165,6 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string&
label) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
- // TODO should we use label, because the replay wal use the same label and
different wal_id
request.__set_label(label);
std::string reason = "relay wal with label " + label;
request.__set_reason(reason);
@@ -189,20 +182,16 @@ Status WalTable::_try_abort_txn(int64_t db_id,
std::string& label) {
}
Status WalTable::_replay_wal_internal(const std::string& wal) {
- LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" <<
_table_id << ", wal=" << wal;
+ LOG(INFO) << "start replay wal=" << wal;
int64_t wal_id = 0;
std::string label = "";
RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
#ifndef BE_TEST
if (!config::group_commit_wait_replay_wal_finish) {
- auto st = _try_abort_txn(_db_id, label);
- if (!st.ok()) {
- LOG(WARNING) << "failed to abort txn with label " << label;
- }
+ [[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
}
#endif
- RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
- return Status::OK();
+ return _replay_one_txn_with_stremaload(wal_id, wal, label);
}
Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id,
std::string& label) {
@@ -267,17 +256,14 @@ Status WalTable::_handle_stream_load(int64_t wal_id,
const std::string& wal,
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
ctx->auth.auth_code = wal_id;
- auto commit_st =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
- st = commit_st;
- } else if (!ctx->status.ok()) {
+ st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
+ } else {
st = ctx->status;
}
}
if (!st.ok()) {
- LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ",
errmsg=" << st;
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
}
- LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
return st;
}
@@ -291,14 +277,9 @@ Status WalTable::_replay_one_txn_with_stremaload(int64_t
wal_id, const std::stri
msg.find("LabelAlreadyUsedException") != msg.npos;
#else
success = k_stream_load_exec_status.ok();
+ auto st = Status::OK();
#endif
- if (success) {
- LOG(INFO) << "success to replay wal =" << wal;
- } else {
- LOG(INFO) << "fail to replay wal =" << wal;
- return Status::InternalError("fail to replay wal =" + wal);
- }
- return Status::OK();
+ return success ? Status::OK() : st;
}
void WalTable::stop() {
diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 62b1352a577..1e869e94048 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -50,7 +50,7 @@ Status WalWriter::init() {
Status WalWriter::finalize() {
auto st = _file_writer->close();
if (!st.ok()) {
- LOG(WARNING) << "fail to close file " << _file_name;
+ LOG(WARNING) << "fail to close wal " << _file_name;
}
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 20872eb59d4..7af4913b642 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -65,8 +65,6 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
_block_queue.push_back(block);
_data_bytes += block->bytes();
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
- } else {
- LOG(INFO) << "skip adding block to queue on txn " << txn_id;
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
@@ -405,7 +403,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
auto delete_st = _exec_env->wal_mgr()->delete_wal(
table_id, txn_id,
load_block_queue->block_queue_pre_allocated());
if (!delete_st.ok()) {
- LOG(WARNING) << "fail to delete wal " << txn_id;
+ LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" <<
delete_st.to_string();
}
}
} else {
@@ -545,7 +543,7 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t
pre_allocated) {
{
Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path,
&available_bytes);
if (!st.ok()) {
- LOG(WARNING) << "get wal disk available size filed!";
+ LOG(WARNING) << "get wal dir available size failed, st=" <<
st.to_string();
}
}
if (pre_allocated < available_bytes) {
diff --git a/regression-test/data/insert_p0/insert_group_commit_into_unique.out
b/regression-test/data/insert_p0/insert_group_commit_into_unique.out
index 2946a07897f..17121bfe0bd 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into_unique.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into_unique.out
@@ -7,7 +7,7 @@
5 q 50 0
6 \N -1 0
11 a 211 0
-12 b 22 1
+12 b 23 1
13 c 23 0
14 d 24 0
15 c 23 0
@@ -34,7 +34,7 @@
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
-12 b 22 22 1
+12 b 23 23 1
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
@@ -60,7 +60,7 @@
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
-11 a 11 10 1
+11 a 11 12 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
@@ -87,7 +87,7 @@
5 q 50 0
6 \N -1 0
11 a 211 0
-12 b 22 1
+12 b 23 1
13 c 23 0
14 d 24 0
15 c 23 0
@@ -114,7 +114,7 @@
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
-12 b 22 22 1
+12 b 23 23 1
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
@@ -140,7 +140,7 @@
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
-11 a 11 10 1
+11 a 11 12 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
diff --git
a/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out
b/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out
index 2946a07897f..17121bfe0bd 100644
---
a/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out
+++
b/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out
@@ -7,7 +7,7 @@
5 q 50 0
6 \N -1 0
11 a 211 0
-12 b 22 1
+12 b 23 1
13 c 23 0
14 d 24 0
15 c 23 0
@@ -34,7 +34,7 @@
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
-12 b 22 22 1
+12 b 23 23 1
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
@@ -60,7 +60,7 @@
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
-11 a 11 10 1
+11 a 11 12 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
@@ -87,7 +87,7 @@
5 q 50 0
6 \N -1 0
11 a 211 0
-12 b 22 1
+12 b 23 1
13 c 23 0
14 d 24 0
15 c 23 0
@@ -114,7 +114,7 @@
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
-12 b 22 22 1
+12 b 23 23 1
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
@@ -140,7 +140,7 @@
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
-11 a 11 10 1
+11 a 11 12 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
diff --git a/regression-test/data/insert_p0/test_group_commit_2.csv
b/regression-test/data/insert_p0/test_group_commit_2.csv
index 0f81665b1e3..d706739913c 100644
--- a/regression-test/data/insert_p0/test_group_commit_2.csv
+++ b/regression-test/data/insert_p0/test_group_commit_2.csv
@@ -1,5 +1,5 @@
11,a,211,0
-12,b,22,1
+12,b,23,1
15,c,23,0
16,d,24,1
27,e,25,0
\ No newline at end of file
diff --git a/regression-test/data/insert_p0/test_group_commit_4.csv
b/regression-test/data/insert_p0/test_group_commit_4.csv
index 0b2678c40f3..5849c0ebb2c 100644
--- a/regression-test/data/insert_p0/test_group_commit_4.csv
+++ b/regression-test/data/insert_p0/test_group_commit_4.csv
@@ -1,5 +1,5 @@
10,a,10,11,0
-11,a,11,10,1
+11,a,11,12,1
12,a,12,9,0
13,a,13,9,1
20,b,20,8,0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]