This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e77d54b3028 branch-3.1: [refactor](sink) refactor vtablet writer 
sequential close to parallel close #51404 (#52793)
e77d54b3028 is described below

commit e77d54b3028cc32c519247527095b53ac058c9e7
Author: hui lai <[email protected]>
AuthorDate: Wed Jul 9 10:38:26 2025 +0800

    branch-3.1: [refactor](sink) refactor vtablet writer sequential close to 
parallel close #51404 (#52793)
    
    pick #51404
---
 be/src/vec/sink/writer/vtablet_writer.cpp          | 220 ++++++++++++---------
 be/src/vec/sink/writer/vtablet_writer.h            |  88 +++++++--
 .../test_writer_fault_injection.groovy             | 100 ++++++++++
 3 files changed, 297 insertions(+), 111 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 092d21930eb..581fc1704e9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -270,6 +270,76 @@ Status 
IndexChannel::check_tablet_filtered_rows_consistency() {
     return Status::OK();
 }
 
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+                                                           const std::string& 
err_msg,
+                                                           IndexChannel& ich, 
VNodeChannel& nch) {
+    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich.mark_as_failed(&nch, err_msg, -1);
+    // cancel the node channel in best effort
+    nch.cancel(err_msg);
+
+    // check if index has intolerable failure
+    if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) {
+        status = std::move(index_st);
+    } else if (Status receive_st = 
ich.check_tablet_received_rows_consistency(); !receive_st.ok()) {
+        status = std::move(receive_st);
+    } else if (Status filter_st = 
ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) {
+        status = std::move(filter_st);
+    }
+    return status;
+}
+
+Status IndexChannel::close_wait(
+        RuntimeState* state, WriterStats* writer_stats,
+        std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
+        std::unordered_set<int64_t> unfinished_node_channel_ids) {
+    Status status = Status::OK();
+    while (true) {
+        status = check_each_node_channel_close(&unfinished_node_channel_ids,
+                                               node_add_batch_counter_map, 
writer_stats, status);
+        if (!status.ok() || unfinished_node_channel_ids.empty()) {
+            LOG(INFO) << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
+                      << ", status: " << status << ", txn_id: " << 
_parent->_txn_id
+                      << ", load_id: " << print_id(_parent->_load_id);
+            break;
+        }
+        bthread_usleep(1000 * 10);
+    }
+
+    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
+                    { status = Status::TimedOut("injected timeout"); });
+
+    return status;
+}
+
+Status IndexChannel::check_each_node_channel_close(
+        std::unordered_set<int64_t>* unfinished_node_channel_ids,
+        std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
+        WriterStats* writer_stats, Status status) {
+    Status final_status = Status::OK();
+    for (auto& it : _node_channels) {
+        std::shared_ptr<VNodeChannel> node_channel = it.second;
+        // If the node channel is not in the unfinished_node_channel_ids,
+        // it means the node channel is already closed.
+        if (!unfinished_node_channel_ids->contains(it.first)) {
+            continue;
+        }
+        bool node_channel_closed = false;
+        auto close_status = it.second->close_wait(_parent->_state, 
&node_channel_closed);
+        if (node_channel_closed) {
+            close_status = it.second->after_close_handle(_parent->_state, 
writer_stats,
+                                                         
node_add_batch_counter_map);
+            unfinished_node_channel_ids->erase(it.first);
+        }
+        if (!close_status.ok()) {
+            final_status = cancel_channel_and_check_intolerable_failure(
+                    std::move(final_status), close_status.to_string(), *this, 
*it.second);
+        }
+    }
+
+    return final_status;
+}
+
 static Status none_of(std::initializer_list<bool> vars) {
     bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return 
var; });
     Status st = Status::OK();
@@ -935,13 +1005,15 @@ void VNodeChannel::cancel(const std::string& cancel_msg) 
{
     static_cast<void>(request->release_id());
 }
 
-Status VNodeChannel::close_wait(RuntimeState* state) {
+Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) {
     DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
         std::thread t(injection_full_gc_fn);
         t.join();
     });
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
 
+    *is_closed = true;
+
     auto st = none_of({_cancelled, !_eos_is_produced});
     if (!st.ok()) {
         if (_cancelled) {
@@ -954,19 +1026,32 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         }
     }
 
+    DBUG_EXECUTE_IF("VNodeChannel.close_wait.cancelled", {
+        _cancelled = true;
+        _cancel_msg = "injected cancel";
+    });
+
+    if (state->is_cancelled()) {
+        _cancel_with_msg(state->cancel_reason().to_string());
+    }
+
     // Waiting for finished until _add_batches_finished changed by rpc's 
finished callback.
     // it may take a long time, so we couldn't set a timeout
     // For pipeline engine, the close is called in async writer's process 
block method,
     // so that it will not block pipeline thread.
-    while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
-        bthread_usleep(1000);
+    if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
+        *is_closed = false;
+        return Status::OK();
     }
     VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
-    _close_time_ms = UnixMillis() - _close_time_ms;
+    return Status::OK();
+}
 
-    if (state->is_cancelled()) {
-        _cancel_with_msg(state->cancel_reason().to_string());
-    }
+Status VNodeChannel::after_close_handle(
+        RuntimeState* state, WriterStats* writer_stats,
+        std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map) {
+    Status st = Status::Error<ErrorCode::INTERNAL_ERROR, 
false>(get_cancel_msg());
+    _close_time_ms = UnixMillis() - _close_time_ms;
 
     if (_add_batches_finished) {
         _close_check();
@@ -980,10 +1065,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         // only when normal close, we set _is_closed to true.
         // otherwise, we will set it to true in cancel().
         _is_closed = true;
-        return Status::OK();
+        st = Status::OK();
     }
 
-    return Status::Error<ErrorCode::INTERNAL_ERROR, false>(get_cancel_msg());
+    time_report(node_add_batch_counter_map, writer_stats);
+    return st;
 }
 
 void VNodeChannel::_close_check() {
@@ -1386,26 +1472,6 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-static Status cancel_channel_and_check_intolerable_failure(Status status,
-                                                           const std::string& 
err_msg,
-                                                           IndexChannel& ich, 
VNodeChannel& nch) {
-    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << 
err_msg;
-    ich.mark_as_failed(&nch, err_msg, -1);
-    // cancel the node channel in best effort
-    nch.cancel(err_msg);
-
-    // check if index has intolerable failure
-    Status index_st = ich.check_intolerable_failure();
-    if (!index_st.ok()) {
-        status = std::move(index_st);
-    } else if (Status st = ich.check_tablet_received_rows_consistency(); 
!st.ok()) {
-        status = std::move(st);
-    } else if (Status st = ich.check_tablet_filtered_rows_consistency(); 
!st.ok()) {
-        status = std::move(st);
-    }
-    return status;
-}
-
 void VTabletWriter::_cancel_all_channel(Status status) {
     for (const auto& index_channel : _channels) {
         index_channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
@@ -1484,19 +1550,9 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                 if (!status.ok()) {
                     break;
                 }
-                index_channel->for_init_node_channel(
-                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
-                            if (!status.ok() || ch->is_closed()) {
-                                return;
-                            }
-                            auto s = ch->close_wait(_state);
-                            VLOG_DEBUG << index_channel->_parent->_sender_id 
<< "'s " << ch->host()
-                                       << "close1 wait finished!";
-                            if (!s.ok()) {
-                                status = 
cancel_channel_and_check_intolerable_failure(
-                                        std::move(status), s.to_string(), 
*index_channel, *ch);
-                            }
-                        });
+
+                status = index_channel->close_wait(_state, nullptr, nullptr,
+                                                   
index_channel->init_node_channel_ids());
                 if (!status.ok()) {
                     break;
                 }
@@ -1557,21 +1613,15 @@ Status VTabletWriter::close(Status exec_status) {
     _do_try_close(_state, exec_status);
     TEST_INJECTION_POINT("VOlapTableSink::close");
 
+    DBUG_EXECUTE_IF("VTabletWriter.close.close_status_not_ok",
+                    { _close_status = Status::InternalError("injected close 
status not ok"); });
+
     // If _close_status is not ok, all nodes have been canceled in try_close.
     if (_close_status.ok()) {
         auto status = Status::OK();
         // BE id -> add_batch method counter
         std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
-        int64_t serialize_batch_ns = 0;
-        int64_t queue_push_lock_ns = 0;
-        int64_t actual_consume_ns = 0;
-        int64_t total_add_batch_exec_time_ns = 0;
-        int64_t max_add_batch_exec_time_ns = 0;
-        int64_t total_wait_exec_time_ns = 0;
-        int64_t max_wait_exec_time_ns = 0;
-        int64_t total_add_batch_num = 0;
-        int64_t num_node_channels = 0;
-        VNodeChannelStat channel_stat;
+        WriterStats writer_stats;
 
         for (const auto& index_channel : _channels) {
             if (!status.ok()) {
@@ -1579,28 +1629,8 @@ Status VTabletWriter::close(Status exec_status) {
             }
             int64_t add_batch_exec_time = 0;
             int64_t wait_exec_time = 0;
-            index_channel->for_each_node_channel(
-                    [this, &index_channel, &status, 
&node_add_batch_counter_map,
-                     &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,
-                     &total_add_batch_exec_time_ns, &add_batch_exec_time, 
&total_wait_exec_time_ns,
-                     &wait_exec_time,
-                     &total_add_batch_num](const 
std::shared_ptr<VNodeChannel>& ch) {
-                        if (!status.ok() || (ch->is_closed() && 
!ch->is_cancelled())) {
-                            return;
-                        }
-                        // in pipeline, all node channels are done or 
canceled, will not block.
-                        // no pipeline, close may block waiting.
-                        auto s = ch->close_wait(_state);
-                        if (!s.ok()) {
-                            status = 
cancel_channel_and_check_intolerable_failure(
-                                    std::move(status), s.to_string(), 
*index_channel, *ch);
-                        }
-                        ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
-                                        &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,
-                                        &total_add_batch_exec_time_ns, 
&add_batch_exec_time,
-                                        &total_wait_exec_time_ns, 
&wait_exec_time,
-                                        &total_add_batch_num);
-                    });
+            status = index_channel->close_wait(_state, &writer_stats, 
&node_add_batch_counter_map,
+                                               
index_channel->each_node_channel_ids());
 
             // Due to the non-determinism of compaction, the rowsets of each 
replica may be different from each other on different
             // BE nodes. The number of rows filtered in SegmentWriter depends 
on the historical rowsets located in the correspoding
@@ -1615,21 +1645,22 @@ Status VTabletWriter::close(Status exec_status) {
                 }
             }
 
-            num_node_channels += index_channel->num_node_channels();
-            if (add_batch_exec_time > max_add_batch_exec_time_ns) {
-                max_add_batch_exec_time_ns = add_batch_exec_time;
+            writer_stats.num_node_channels += 
index_channel->num_node_channels();
+            if (add_batch_exec_time > writer_stats.max_add_batch_exec_time_ns) 
{
+                writer_stats.max_add_batch_exec_time_ns = add_batch_exec_time;
             }
-            if (wait_exec_time > max_wait_exec_time_ns) {
-                max_wait_exec_time_ns = wait_exec_time;
+            if (wait_exec_time > writer_stats.max_wait_exec_time_ns) {
+                writer_stats.max_wait_exec_time_ns = wait_exec_time;
             }
         } // end for index channels
 
         if (status.ok()) {
             // TODO need to be improved
-            LOG(INFO) << "total mem_exceeded_block_ns=" << 
channel_stat.mem_exceeded_block_ns
-                      << ", total queue_push_lock_ns=" << queue_push_lock_ns
-                      << ", total actual_consume_ns=" << actual_consume_ns
-                      << ", load id=" << print_id(_load_id);
+            LOG(INFO) << "total mem_exceeded_block_ns="
+                      << writer_stats.channel_stat.mem_exceeded_block_ns
+                      << ", total queue_push_lock_ns=" << 
writer_stats.queue_push_lock_ns
+                      << ", total actual_consume_ns=" << 
writer_stats.actual_consume_ns
+                      << ", load id=" << print_id(_load_id) << ", txn_id=" << 
_txn_id;
 
             COUNTER_SET(_input_rows_counter, _number_input_rows);
             COUNTER_SET(_output_rows_counter, _number_output_rows);
@@ -1640,18 +1671,19 @@ Status VTabletWriter::close(Status exec_status) {
             COUNTER_SET(_send_data_timer, _send_data_ns);
             COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
             COUNTER_SET(_filter_timer, _filter_ns);
-            COUNTER_SET(_append_node_channel_timer, 
channel_stat.append_node_channel_ns);
-            COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
-            COUNTER_SET(_wait_mem_limit_timer, 
channel_stat.mem_exceeded_block_ns);
+            COUNTER_SET(_append_node_channel_timer,
+                        writer_stats.channel_stat.append_node_channel_ns);
+            COUNTER_SET(_where_clause_timer, 
writer_stats.channel_stat.where_clause_ns);
+            COUNTER_SET(_wait_mem_limit_timer, 
writer_stats.channel_stat.mem_exceeded_block_ns);
             COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
-            COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
-            COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
-            COUNTER_SET(_total_add_batch_exec_timer, 
total_add_batch_exec_time_ns);
-            COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
-            COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
-            COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
-            COUNTER_SET(_add_batch_number, total_add_batch_num);
-            COUNTER_SET(_num_node_channels, num_node_channels);
+            COUNTER_SET(_serialize_batch_timer, 
writer_stats.serialize_batch_ns);
+            COUNTER_SET(_non_blocking_send_work_timer, 
writer_stats.actual_consume_ns);
+            COUNTER_SET(_total_add_batch_exec_timer, 
writer_stats.total_add_batch_exec_time_ns);
+            COUNTER_SET(_max_add_batch_exec_timer, 
writer_stats.max_add_batch_exec_time_ns);
+            COUNTER_SET(_total_wait_exec_timer, 
writer_stats.total_wait_exec_time_ns);
+            COUNTER_SET(_max_wait_exec_timer, 
writer_stats.max_wait_exec_time_ns);
+            COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num);
+            COUNTER_SET(_num_node_channels, writer_stats.num_node_channels);
 
             // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
             int64_t num_rows_load_total = _number_input_rows + 
_state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 1dbeb6722ce..116aa20a98c 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -206,6 +206,19 @@ public:
     int64_t append_node_channel_ns = 0;
 };
 
+struct WriterStats {
+    int64_t serialize_batch_ns = 0;
+    int64_t queue_push_lock_ns = 0;
+    int64_t actual_consume_ns = 0;
+    int64_t total_add_batch_exec_time_ns = 0;
+    int64_t max_add_batch_exec_time_ns = 0;
+    int64_t total_wait_exec_time_ns = 0;
+    int64_t max_wait_exec_time_ns = 0;
+    int64_t total_add_batch_num = 0;
+    int64_t num_node_channels = 0;
+    VNodeChannelStat channel_stat;
+};
+
 // pair<row_id,tablet_id>
 using Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>;
 
@@ -273,27 +286,31 @@ public:
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
-    Status close_wait(RuntimeState* state);
+    Status close_wait(RuntimeState* state, bool* is_closed);
+
+    Status after_close_handle(
+            RuntimeState* state, WriterStats* writer_stats,
+            std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map);
 
     void cancel(const std::string& cancel_msg);
 
     void time_report(std::unordered_map<int64_t, AddBatchCounter>* 
add_batch_counter_map,
-                     int64_t* serialize_batch_ns, VNodeChannelStat* stat,
-                     int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
-                     int64_t* total_add_batch_exec_time_ns, int64_t* 
add_batch_exec_time_ns,
-                     int64_t* total_wait_exec_time_ns, int64_t* 
wait_exec_time_ns,
-                     int64_t* total_add_batch_num) const {
-        (*add_batch_counter_map)[_node_id] += _add_batch_counter;
-        (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
-        *serialize_batch_ns += _serialize_batch_ns;
-        *stat += _stat;
-        *queue_push_lock_ns += _queue_push_lock_ns;
-        *actual_consume_ns += _actual_consume_ns;
-        *add_batch_exec_time_ns = 
(_add_batch_counter.add_batch_execution_time_us * 1000);
-        *total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
-        *wait_exec_time_ns = 
(_add_batch_counter.add_batch_wait_execution_time_us * 1000);
-        *total_wait_exec_time_ns += *wait_exec_time_ns;
-        *total_add_batch_num += _add_batch_counter.add_batch_num;
+                     WriterStats* writer_stats) const {
+        if (add_batch_counter_map != nullptr) {
+            (*add_batch_counter_map)[_node_id] += _add_batch_counter;
+            (*add_batch_counter_map)[_node_id].close_wait_time_ms = 
_close_time_ms;
+        }
+        if (writer_stats != nullptr) {
+            writer_stats->serialize_batch_ns += _serialize_batch_ns;
+            writer_stats->channel_stat += _stat;
+            writer_stats->queue_push_lock_ns += _queue_push_lock_ns;
+            writer_stats->actual_consume_ns += _actual_consume_ns;
+            writer_stats->total_add_batch_exec_time_ns +=
+                    (_add_batch_counter.add_batch_execution_time_us * 1000);
+            writer_stats->total_wait_exec_time_ns +=
+                    (_add_batch_counter.add_batch_wait_execution_time_us * 
1000);
+            writer_stats->total_add_batch_num += 
_add_batch_counter.add_batch_num;
+        }
     }
 
     int64_t node_id() const { return _node_id; }
@@ -452,12 +469,49 @@ public:
         }
     }
 
+    std::unordered_set<int64_t> init_node_channel_ids() {
+        std::unordered_set<int64_t> node_channel_ids;
+        for (auto& it : _node_channels) {
+            if (!it.second->is_incremental()) {
+                node_channel_ids.insert(it.first);
+            }
+        }
+        return node_channel_ids;
+    }
+
+    std::unordered_set<int64_t> inc_node_channel_ids() {
+        std::unordered_set<int64_t> node_channel_ids;
+        for (auto& it : _node_channels) {
+            if (it.second->is_incremental()) {
+                node_channel_ids.insert(it.first);
+            }
+        }
+        return node_channel_ids;
+    }
+
+    std::unordered_set<int64_t> each_node_channel_ids() {
+        std::unordered_set<int64_t> node_channel_ids;
+        for (auto& it : _node_channels) {
+            node_channel_ids.insert(it.first);
+        }
+        return node_channel_ids;
+    }
+
     bool has_incremental_node_channel() const { return _has_inc_node; }
 
     void mark_as_failed(const VNodeChannel* node_channel, const std::string& 
err,
                         int64_t tablet_id = -1);
     Status check_intolerable_failure();
 
+    Status close_wait(RuntimeState* state, WriterStats* writer_stats,
+                      std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
+                      std::unordered_set<int64_t> unfinished_node_channel_ids);
+
+    Status check_each_node_channel_close(
+            std::unordered_set<int64_t>* unfinished_node_channel_ids,
+            std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
+            WriterStats* writer_stats, Status status);
+
     // set error tablet info in runtime state, so that it can be returned to 
FE.
     void set_error_tablet_in_state(RuntimeState* state);
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
new file mode 100644
index 00000000000..5f1d9fced72
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_writer_fault_injection", "nonConcurrent") {
+    sql """ set enable_memtable_on_sink_node=false """
+    try {
+        sql """
+            CREATE TABLE IF NOT EXISTS `baseall` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+            """
+        sql """
+            CREATE TABLE IF NOT EXISTS `test` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace_if_not_null null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+            """
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        streamLoad {
+            table "baseall"
+            db "regression_test_fault_injection_p0"
+            set 'column_separator', ','
+            file "baseall.txt"
+        }
+
+        def load_with_injection = { injection, error_msg="", success=false->
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                sql "insert into test select * from baseall where k1 <= 3"
+                assertTrue(success, String.format("expected Exception '%s', 
actual success", error_msg))
+            } catch(Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains(error_msg),
+                        String.format("expected '%s', actual '%s'", error_msg, 
e.getMessage()))
+            } finally {
+                sleep 1000 // wait some time for instance finish before 
disable injection
+                GetDebugPoint().disableDebugPointForAllBEs(injection)
+            }
+        }
+
+        // VTabletWriter close logic injection tests
+        // Test VNodeChannel close_wait with full gc injection
+        load_with_injection("VNodeChannel.close_wait_full_gc")
+        // Test VNodeChannel try_send_and_fetch_status with full gc injection
+        load_with_injection("VNodeChannel.try_send_and_fetch_status_full_gc")
+        // Test VNodeChannel close_wait when cancelled
+        load_with_injection("VNodeChannel.close_wait.cancelled")
+        // Test IndexChannel close_wait with timeout
+        load_with_injection("IndexChannel.close_wait.timeout")
+        // Test VTabletWriter close with _close_status not ok
+        load_with_injection("VTabletWriter.close.close_status_not_ok")
+    } finally {
+        sql """ set enable_memtable_on_sink_node=true """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to