This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8df2c5b7f8e [Fix](load) Fix data loss when node channel been cancelled
before close wait (#36662)
8df2c5b7f8e is described below
commit 8df2c5b7f8ee1a42ca50706b53d0f9488f34a04a
Author: plat1ko <[email protected]>
AuthorDate: Mon Jun 24 14:28:22 2024 +0800
[Fix](load) Fix data loss when node channel been cancelled before close
wait (#36662)
Fix data loss when node channel been cancelled before close wait.
When an error occurs in `VNodeChannel::try_send_pending_block`, invoking
`VNodeChannel::cancel` sets the `VNodeChannel` to closed. In
`VTabletWriter::close`, if `VNodeChannel::cancel` is called before
`VNodeChannel::close_wait`, it bypasses the error handling code
directly, causing the transaction to still be considered successful.
---
be/src/cloud/injection_point_action.cpp | 10 ++++++++
be/src/vec/sink/writer/vtablet_writer.cpp | 42 ++++++++++++++++++-------------
2 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/be/src/cloud/injection_point_action.cpp
b/be/src/cloud/injection_point_action.cpp
index d99dcfd534d..e8c57ed91e8 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -98,6 +98,16 @@ void register_suites() {
should_ret = true;
});
});
+ suite_map.emplace("test_cancel_node_channel", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("VNodeChannel::try_send_block", [](auto&& args) {
+ LOG(INFO) << "injection VNodeChannel::try_send_block";
+ auto* arg0 = try_any_cast<Status*>(args[0]);
+ *arg0 = Status::InternalError<false>("test_cancel_node_channel
injection error");
+ });
+ sp->set_call_back("VOlapTableSink::close",
+ [](auto&&) {
std::this_thread::sleep_for(std::chrono::seconds(5)); });
+ });
}
void set_sleep(const std::string& point, HttpRequest* req) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b31796fe724..22d9bbd840b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -45,6 +45,7 @@
#include <vector>
#include "cloud/config.h"
+#include "common/sync_point.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -627,6 +628,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
&uncompressed_bytes, &compressed_bytes,
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
+ TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
_send_block_callback->clear_in_flight();
@@ -1325,22 +1327,22 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
-static Status cancel_channel_and_check_intolerable_failure(
- Status status, const std::string& err_msg, const
std::shared_ptr<IndexChannel> ich,
- const std::shared_ptr<VNodeChannel> nch) {
- LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " <<
err_msg;
- ich->mark_as_failed(nch.get(), err_msg, -1);
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+ const std::string&
err_msg,
+ IndexChannel& ich,
VNodeChannel& nch) {
+ LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " <<
err_msg;
+ ich.mark_as_failed(&nch, err_msg, -1);
// cancel the node channel in best effort
- nch->cancel(err_msg);
+ nch.cancel(err_msg);
// check if index has intolerable failure
- Status index_st = ich->check_intolerable_failure();
+ Status index_st = ich.check_intolerable_failure();
if (!index_st.ok()) {
- status = index_st;
- } else if (Status st = ich->check_tablet_received_rows_consistency();
!st.ok()) {
- status = st;
- } else if (Status st = ich->check_tablet_filtered_rows_consistency();
!st.ok()) {
- status = st;
+ status = std::move(index_st);
+ } else if (Status st = ich.check_tablet_received_rows_consistency();
!st.ok()) {
+ status = std::move(st);
+ } else if (Status st = ich.check_tablet_filtered_rows_consistency();
!st.ok()) {
+ status = std::move(st);
}
return status;
}
@@ -1416,7 +1418,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close(true);
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
if (!status.ok()) {
@@ -1432,7 +1435,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
<< "close1 wait finished!";
if (!s.ok()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, s.to_string(), index_channel,
ch);
+ std::move(status), s.to_string(),
*index_channel, *ch);
}
});
if (!status.ok()) {
@@ -1450,7 +1453,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
} else { // not has_incremental_node_channel
@@ -1464,7 +1468,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
ch->mark_close();
if (ch->is_cancelled()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
+ std::move(status),
ch->get_cancel_msg(), *index_channel,
+ *ch);
}
});
}
@@ -1491,6 +1496,7 @@ Status VTabletWriter::close(Status exec_status) {
// will make the last batch of request-> close_wait will wait this
finished.
_do_try_close(_state, exec_status);
+ TEST_INJECTION_POINT("VOlapTableSink::close");
// If _close_status is not ok, all nodes have been canceled in try_close.
if (_close_status.ok()) {
@@ -1520,7 +1526,7 @@ Status VTabletWriter::close(Status exec_status) {
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_wait_exec_time_ns,
&wait_exec_time,
&total_add_batch_num](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
+ if (!status.ok() || (ch->is_closed() &&
!ch->is_cancelled())) {
return;
}
// in pipeline, all node channels are done or
canceled, will not block.
@@ -1528,7 +1534,7 @@ Status VTabletWriter::close(Status exec_status) {
auto s = ch->close_wait(_state);
if (!s.ok()) {
status =
cancel_channel_and_check_intolerable_failure(
- status, s.to_string(), index_channel, ch);
+ std::move(status), s.to_string(),
*index_channel, *ch);
}
ch->time_report(&node_add_batch_counter_map,
&serialize_batch_ns,
&channel_stat, &queue_push_lock_ns,
&actual_consume_ns,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]