This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e77d54b3028 branch-3.1: [refactor](sink) refactor vtablet writer
sequential close to parallel close #51404 (#52793)
e77d54b3028 is described below
commit e77d54b3028cc32c519247527095b53ac058c9e7
Author: hui lai <[email protected]>
AuthorDate: Wed Jul 9 10:38:26 2025 +0800
branch-3.1: [refactor](sink) refactor vtablet writer sequential close to
parallel close #51404 (#52793)
pick #51404
---
be/src/vec/sink/writer/vtablet_writer.cpp | 220 ++++++++++++---------
be/src/vec/sink/writer/vtablet_writer.h | 88 +++++++--
.../test_writer_fault_injection.groovy | 100 ++++++++++
3 files changed, 297 insertions(+), 111 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 092d21930eb..581fc1704e9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -270,6 +270,76 @@ Status
IndexChannel::check_tablet_filtered_rows_consistency() {
return Status::OK();
}
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+ const std::string&
err_msg,
+ IndexChannel& ich,
VNodeChannel& nch) {
+ LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " <<
err_msg;
+ ich.mark_as_failed(&nch, err_msg, -1);
+ // cancel the node channel in best effort
+ nch.cancel(err_msg);
+
+ // check if index has intolerable failure
+ if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) {
+ status = std::move(index_st);
+ } else if (Status receive_st =
ich.check_tablet_received_rows_consistency(); !receive_st.ok()) {
+ status = std::move(receive_st);
+ } else if (Status filter_st =
ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) {
+ status = std::move(filter_st);
+ }
+ return status;
+}
+
+Status IndexChannel::close_wait(
+ RuntimeState* state, WriterStats* writer_stats,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
+ std::unordered_set<int64_t> unfinished_node_channel_ids) {
+ Status status = Status::OK();
+ while (true) {
+ status = check_each_node_channel_close(&unfinished_node_channel_ids,
+ node_add_batch_counter_map,
writer_stats, status);
+ if (!status.ok() || unfinished_node_channel_ids.empty()) {
+ LOG(INFO) << ", is all unfinished: " <<
unfinished_node_channel_ids.empty()
+ << ", status: " << status << ", txn_id: " <<
_parent->_txn_id
+ << ", load_id: " << print_id(_parent->_load_id);
+ break;
+ }
+ bthread_usleep(1000 * 10);
+ }
+
+ DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
+ { status = Status::TimedOut("injected timeout"); });
+
+ return status;
+}
+
+Status IndexChannel::check_each_node_channel_close(
+ std::unordered_set<int64_t>* unfinished_node_channel_ids,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
+ WriterStats* writer_stats, Status status) {
+ Status final_status = Status::OK();
+ for (auto& it : _node_channels) {
+ std::shared_ptr<VNodeChannel> node_channel = it.second;
+ // If the node channel is not in the unfinished_node_channel_ids,
+ // it means the node channel is already closed.
+ if (!unfinished_node_channel_ids->contains(it.first)) {
+ continue;
+ }
+ bool node_channel_closed = false;
+ auto close_status = it.second->close_wait(_parent->_state,
&node_channel_closed);
+ if (node_channel_closed) {
+ close_status = it.second->after_close_handle(_parent->_state,
writer_stats,
+
node_add_batch_counter_map);
+ unfinished_node_channel_ids->erase(it.first);
+ }
+ if (!close_status.ok()) {
+ final_status = cancel_channel_and_check_intolerable_failure(
+ std::move(final_status), close_status.to_string(), *this,
*it.second);
+ }
+ }
+
+ return final_status;
+}
+
static Status none_of(std::initializer_list<bool> vars) {
bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return
var; });
Status st = Status::OK();
@@ -935,13 +1005,15 @@ void VNodeChannel::cancel(const std::string& cancel_msg)
{
static_cast<void>(request->release_id());
}
-Status VNodeChannel::close_wait(RuntimeState* state) {
+Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) {
DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
std::thread t(injection_full_gc_fn);
t.join();
});
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+ *is_closed = true;
+
auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
if (_cancelled) {
@@ -954,19 +1026,32 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
}
+ DBUG_EXECUTE_IF("VNodeChannel.close_wait.cancelled", {
+ _cancelled = true;
+ _cancel_msg = "injected cancel";
+ });
+
+ if (state->is_cancelled()) {
+ _cancel_with_msg(state->cancel_reason().to_string());
+ }
+
// Waiting for finished until _add_batches_finished changed by rpc's
finished callback.
// it may take a long time, so we couldn't set a timeout
// For pipeline engine, the close is called in async writer's process
block method,
// so that it will not block pipeline thread.
- while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
- bthread_usleep(1000);
+ if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
+ *is_closed = false;
+ return Status::OK();
}
VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
- _close_time_ms = UnixMillis() - _close_time_ms;
+ return Status::OK();
+}
- if (state->is_cancelled()) {
- _cancel_with_msg(state->cancel_reason().to_string());
- }
+Status VNodeChannel::after_close_handle(
+ RuntimeState* state, WriterStats* writer_stats,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map) {
+ Status st = Status::Error<ErrorCode::INTERNAL_ERROR,
false>(get_cancel_msg());
+ _close_time_ms = UnixMillis() - _close_time_ms;
if (_add_batches_finished) {
_close_check();
@@ -980,10 +1065,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
// only when normal close, we set _is_closed to true.
// otherwise, we will set it to true in cancel().
_is_closed = true;
- return Status::OK();
+ st = Status::OK();
}
- return Status::Error<ErrorCode::INTERNAL_ERROR, false>(get_cancel_msg());
+ time_report(node_add_batch_counter_map, writer_stats);
+ return st;
}
void VNodeChannel::_close_check() {
@@ -1386,26 +1472,6 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
-static Status cancel_channel_and_check_intolerable_failure(Status status,
- const std::string&
err_msg,
- IndexChannel& ich,
VNodeChannel& nch) {
- LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " <<
err_msg;
- ich.mark_as_failed(&nch, err_msg, -1);
- // cancel the node channel in best effort
- nch.cancel(err_msg);
-
- // check if index has intolerable failure
- Status index_st = ich.check_intolerable_failure();
- if (!index_st.ok()) {
- status = std::move(index_st);
- } else if (Status st = ich.check_tablet_received_rows_consistency();
!st.ok()) {
- status = std::move(st);
- } else if (Status st = ich.check_tablet_filtered_rows_consistency();
!st.ok()) {
- status = std::move(st);
- }
- return status;
-}
-
void VTabletWriter::_cancel_all_channel(Status status) {
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&status](const
std::shared_ptr<VNodeChannel>& ch) {
@@ -1484,19 +1550,9 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
if (!status.ok()) {
break;
}
- index_channel->for_init_node_channel(
- [this, &index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
- return;
- }
- auto s = ch->close_wait(_state);
- VLOG_DEBUG << index_channel->_parent->_sender_id
<< "'s " << ch->host()
- << "close1 wait finished!";
- if (!s.ok()) {
- status =
cancel_channel_and_check_intolerable_failure(
- std::move(status), s.to_string(),
*index_channel, *ch);
- }
- });
+
+ status = index_channel->close_wait(_state, nullptr, nullptr,
+
index_channel->init_node_channel_ids());
if (!status.ok()) {
break;
}
@@ -1557,21 +1613,15 @@ Status VTabletWriter::close(Status exec_status) {
_do_try_close(_state, exec_status);
TEST_INJECTION_POINT("VOlapTableSink::close");
+ DBUG_EXECUTE_IF("VTabletWriter.close.close_status_not_ok",
+ { _close_status = Status::InternalError("injected close
status not ok"); });
+
// If _close_status is not ok, all nodes have been canceled in try_close.
if (_close_status.ok()) {
auto status = Status::OK();
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
- int64_t serialize_batch_ns = 0;
- int64_t queue_push_lock_ns = 0;
- int64_t actual_consume_ns = 0;
- int64_t total_add_batch_exec_time_ns = 0;
- int64_t max_add_batch_exec_time_ns = 0;
- int64_t total_wait_exec_time_ns = 0;
- int64_t max_wait_exec_time_ns = 0;
- int64_t total_add_batch_num = 0;
- int64_t num_node_channels = 0;
- VNodeChannelStat channel_stat;
+ WriterStats writer_stats;
for (const auto& index_channel : _channels) {
if (!status.ok()) {
@@ -1579,28 +1629,8 @@ Status VTabletWriter::close(Status exec_status) {
}
int64_t add_batch_exec_time = 0;
int64_t wait_exec_time = 0;
- index_channel->for_each_node_channel(
- [this, &index_channel, &status,
&node_add_batch_counter_map,
- &serialize_batch_ns, &channel_stat, &queue_push_lock_ns,
&actual_consume_ns,
- &total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_wait_exec_time_ns,
- &wait_exec_time,
- &total_add_batch_num](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || (ch->is_closed() &&
!ch->is_cancelled())) {
- return;
- }
- // in pipeline, all node channels are done or
canceled, will not block.
- // no pipeline, close may block waiting.
- auto s = ch->close_wait(_state);
- if (!s.ok()) {
- status =
cancel_channel_and_check_intolerable_failure(
- std::move(status), s.to_string(),
*index_channel, *ch);
- }
- ch->time_report(&node_add_batch_counter_map,
&serialize_batch_ns,
- &channel_stat, &queue_push_lock_ns,
&actual_consume_ns,
- &total_add_batch_exec_time_ns,
&add_batch_exec_time,
- &total_wait_exec_time_ns,
&wait_exec_time,
- &total_add_batch_num);
- });
+ status = index_channel->close_wait(_state, &writer_stats,
&node_add_batch_counter_map,
+
index_channel->each_node_channel_ids());
// Due to the non-determinism of compaction, the rowsets of each
replica may be different from each other on different
// BE nodes. The number of rows filtered in SegmentWriter depends
on the historical rowsets located in the correspoding
@@ -1615,21 +1645,22 @@ Status VTabletWriter::close(Status exec_status) {
}
}
- num_node_channels += index_channel->num_node_channels();
- if (add_batch_exec_time > max_add_batch_exec_time_ns) {
- max_add_batch_exec_time_ns = add_batch_exec_time;
+ writer_stats.num_node_channels +=
index_channel->num_node_channels();
+ if (add_batch_exec_time > writer_stats.max_add_batch_exec_time_ns)
{
+ writer_stats.max_add_batch_exec_time_ns = add_batch_exec_time;
}
- if (wait_exec_time > max_wait_exec_time_ns) {
- max_wait_exec_time_ns = wait_exec_time;
+ if (wait_exec_time > writer_stats.max_wait_exec_time_ns) {
+ writer_stats.max_wait_exec_time_ns = wait_exec_time;
}
} // end for index channels
if (status.ok()) {
// TODO need to be improved
- LOG(INFO) << "total mem_exceeded_block_ns=" <<
channel_stat.mem_exceeded_block_ns
- << ", total queue_push_lock_ns=" << queue_push_lock_ns
- << ", total actual_consume_ns=" << actual_consume_ns
- << ", load id=" << print_id(_load_id);
+ LOG(INFO) << "total mem_exceeded_block_ns="
+ << writer_stats.channel_stat.mem_exceeded_block_ns
+ << ", total queue_push_lock_ns=" <<
writer_stats.queue_push_lock_ns
+ << ", total actual_consume_ns=" <<
writer_stats.actual_consume_ns
+ << ", load id=" << print_id(_load_id) << ", txn_id=" <<
_txn_id;
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
@@ -1640,18 +1671,19 @@ Status VTabletWriter::close(Status exec_status) {
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_filter_timer, _filter_ns);
- COUNTER_SET(_append_node_channel_timer,
channel_stat.append_node_channel_ns);
- COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
- COUNTER_SET(_wait_mem_limit_timer,
channel_stat.mem_exceeded_block_ns);
+ COUNTER_SET(_append_node_channel_timer,
+ writer_stats.channel_stat.append_node_channel_ns);
+ COUNTER_SET(_where_clause_timer,
writer_stats.channel_stat.where_clause_ns);
+ COUNTER_SET(_wait_mem_limit_timer,
writer_stats.channel_stat.mem_exceeded_block_ns);
COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
- COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
- COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
- COUNTER_SET(_total_add_batch_exec_timer,
total_add_batch_exec_time_ns);
- COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
- COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
- COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
- COUNTER_SET(_add_batch_number, total_add_batch_num);
- COUNTER_SET(_num_node_channels, num_node_channels);
+ COUNTER_SET(_serialize_batch_timer,
writer_stats.serialize_batch_ns);
+ COUNTER_SET(_non_blocking_send_work_timer,
writer_stats.actual_consume_ns);
+ COUNTER_SET(_total_add_batch_exec_timer,
writer_stats.total_add_batch_exec_time_ns);
+ COUNTER_SET(_max_add_batch_exec_timer,
writer_stats.max_add_batch_exec_time_ns);
+ COUNTER_SET(_total_wait_exec_timer,
writer_stats.total_wait_exec_time_ns);
+ COUNTER_SET(_max_wait_exec_timer,
writer_stats.max_wait_exec_time_ns);
+ COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num);
+ COUNTER_SET(_num_node_channels, writer_stats.num_node_channels);
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
_state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 1dbeb6722ce..116aa20a98c 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -206,6 +206,19 @@ public:
int64_t append_node_channel_ns = 0;
};
+struct WriterStats {
+ int64_t serialize_batch_ns = 0;
+ int64_t queue_push_lock_ns = 0;
+ int64_t actual_consume_ns = 0;
+ int64_t total_add_batch_exec_time_ns = 0;
+ int64_t max_add_batch_exec_time_ns = 0;
+ int64_t total_wait_exec_time_ns = 0;
+ int64_t max_wait_exec_time_ns = 0;
+ int64_t total_add_batch_num = 0;
+ int64_t num_node_channels = 0;
+ VNodeChannelStat channel_stat;
+};
+
// pair<row_id,tablet_id>
using Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
std::vector<int64_t>>;
@@ -273,27 +286,31 @@ public:
// two ways to stop channel:
// 1. mark_close()->close_wait() PS. close_wait() will block waiting for
the last AddBatch rpc response.
// 2. just cancel()
- Status close_wait(RuntimeState* state);
+ Status close_wait(RuntimeState* state, bool* is_closed);
+
+ Status after_close_handle(
+ RuntimeState* state, WriterStats* writer_stats,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map);
void cancel(const std::string& cancel_msg);
void time_report(std::unordered_map<int64_t, AddBatchCounter>*
add_batch_counter_map,
- int64_t* serialize_batch_ns, VNodeChannelStat* stat,
- int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
- int64_t* total_add_batch_exec_time_ns, int64_t*
add_batch_exec_time_ns,
- int64_t* total_wait_exec_time_ns, int64_t*
wait_exec_time_ns,
- int64_t* total_add_batch_num) const {
- (*add_batch_counter_map)[_node_id] += _add_batch_counter;
- (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
- *serialize_batch_ns += _serialize_batch_ns;
- *stat += _stat;
- *queue_push_lock_ns += _queue_push_lock_ns;
- *actual_consume_ns += _actual_consume_ns;
- *add_batch_exec_time_ns =
(_add_batch_counter.add_batch_execution_time_us * 1000);
- *total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
- *wait_exec_time_ns =
(_add_batch_counter.add_batch_wait_execution_time_us * 1000);
- *total_wait_exec_time_ns += *wait_exec_time_ns;
- *total_add_batch_num += _add_batch_counter.add_batch_num;
+ WriterStats* writer_stats) const {
+ if (add_batch_counter_map != nullptr) {
+ (*add_batch_counter_map)[_node_id] += _add_batch_counter;
+ (*add_batch_counter_map)[_node_id].close_wait_time_ms =
_close_time_ms;
+ }
+ if (writer_stats != nullptr) {
+ writer_stats->serialize_batch_ns += _serialize_batch_ns;
+ writer_stats->channel_stat += _stat;
+ writer_stats->queue_push_lock_ns += _queue_push_lock_ns;
+ writer_stats->actual_consume_ns += _actual_consume_ns;
+ writer_stats->total_add_batch_exec_time_ns +=
+ (_add_batch_counter.add_batch_execution_time_us * 1000);
+ writer_stats->total_wait_exec_time_ns +=
+ (_add_batch_counter.add_batch_wait_execution_time_us *
1000);
+ writer_stats->total_add_batch_num +=
_add_batch_counter.add_batch_num;
+ }
}
int64_t node_id() const { return _node_id; }
@@ -452,12 +469,49 @@ public:
}
}
+ std::unordered_set<int64_t> init_node_channel_ids() {
+ std::unordered_set<int64_t> node_channel_ids;
+ for (auto& it : _node_channels) {
+ if (!it.second->is_incremental()) {
+ node_channel_ids.insert(it.first);
+ }
+ }
+ return node_channel_ids;
+ }
+
+ std::unordered_set<int64_t> inc_node_channel_ids() {
+ std::unordered_set<int64_t> node_channel_ids;
+ for (auto& it : _node_channels) {
+ if (it.second->is_incremental()) {
+ node_channel_ids.insert(it.first);
+ }
+ }
+ return node_channel_ids;
+ }
+
+ std::unordered_set<int64_t> each_node_channel_ids() {
+ std::unordered_set<int64_t> node_channel_ids;
+ for (auto& it : _node_channels) {
+ node_channel_ids.insert(it.first);
+ }
+ return node_channel_ids;
+ }
+
bool has_incremental_node_channel() const { return _has_inc_node; }
void mark_as_failed(const VNodeChannel* node_channel, const std::string&
err,
int64_t tablet_id = -1);
Status check_intolerable_failure();
+ Status close_wait(RuntimeState* state, WriterStats* writer_stats,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
+ std::unordered_set<int64_t> unfinished_node_channel_ids);
+
+ Status check_each_node_channel_close(
+ std::unordered_set<int64_t>* unfinished_node_channel_ids,
+ std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
+ WriterStats* writer_stats, Status status);
+
// set error tablet info in runtime state, so that it can be returned to
FE.
void set_error_tablet_in_state(RuntimeState* state);
diff --git
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
new file mode 100644
index 00000000000..5f1d9fced72
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_writer_fault_injection", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=false """
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ def load_with_injection = { injection, error_msg="", success=false->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql "insert into test select * from baseall where k1 <= 3"
+ assertTrue(success, String.format("expected Exception '%s',
actual success", error_msg))
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains(error_msg),
+ String.format("expected '%s', actual '%s'", error_msg,
e.getMessage()))
+ } finally {
+ sleep 1000 // wait some time for instance finish before
disable injection
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ // VTabletWriter close logic injection tests
+ // Test VNodeChannel close_wait with full gc injection
+ load_with_injection("VNodeChannel.close_wait_full_gc")
+ // Test VNodeChannel try_send_and_fetch_status with full gc injection
+ load_with_injection("VNodeChannel.try_send_and_fetch_status_full_gc")
+ // Test VNodeChannel close_wait when cancelled
+ load_with_injection("VNodeChannel.close_wait.cancelled")
+ // Test IndexChannel close_wait with timeout
+ load_with_injection("IndexChannel.close_wait.timeout")
+ // Test VTabletWriter close with _close_status not ok
+ load_with_injection("VTabletWriter.close.close_status_not_ok")
+ } finally {
+ sql """ set enable_memtable_on_sink_node=true """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]