This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3c12671d9f [fix](move-memtable) fix use-after-free in 
LoadStreamReplyHandler (#29791)
e3c12671d9f is described below

commit e3c12671d9f1dd2a0a1b05529169e315e62dda15
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Jan 10 21:37:57 2024 +0800

    [fix](move-memtable) fix use-after-free in LoadStreamReplyHandler (#29791)
---
 be/src/vec/sink/load_stream_stub.cpp               | 137 ++++++++++++--------
 be/src/vec/sink/load_stream_stub.h                 | 139 ++++++++-------------
 be/src/vec/sink/load_stream_stub_pool.cpp          |  18 ---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  10 +-
 .../test_load_stream_stub_failure_injection.groovy |   6 +-
 5 files changed, 143 insertions(+), 167 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index fdd9762330b..c84ef3b7a0a 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -28,30 +28,35 @@
 
 namespace doris {
 
-int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
-                                                                 butil::IOBuf* 
const messages[],
-                                                                 size_t size) {
+int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, 
butil::IOBuf* const messages[],
+                                                 size_t size) {
+    auto stub = _stub.lock();
+    if (!stub) {
+        LOG(WARNING) << "stub is not exist when on_received_messages, " << 
*this
+                     << ", stream_id=" << id;
+        return 0;
+    }
     for (size_t i = 0; i < size; i++) {
         butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
         PLoadStreamResponse response;
         response.ParseFromZeroCopyStream(&wrapper);
 
         if (response.eos()) {
-            _is_eos.store(true);
+            stub->_is_eos.store(true);
         }
 
         Status st = Status::create(response.status());
 
         std::stringstream ss;
-        ss << "on_received_messages, load_id=" << _load_id << ", backend_id=" 
<< _dst_id;
+        ss << "on_received_messages, " << *this << ", stream_id=" << id;
         if (response.success_tablet_ids_size() > 0) {
             ss << ", success tablet ids:";
             for (auto tablet_id : response.success_tablet_ids()) {
                 ss << " " << tablet_id;
             }
-            std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+            std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex);
             for (auto tablet_id : response.success_tablet_ids()) {
-                _success_tablets.push_back(tablet_id);
+                stub->_success_tablets.push_back(tablet_id);
             }
         }
         if (response.failed_tablets_size() > 0) {
@@ -60,11 +65,24 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
                 Status st = Status::create(pb.status());
                 ss << " " << pb.id() << ":" << st;
             }
-            std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+            std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex);
             for (auto pb : response.failed_tablets()) {
                 Status st = Status::create(pb.status());
-                _failed_tablets.emplace(pb.id(), st);
+                stub->_failed_tablets.emplace(pb.id(), st);
+            }
+        }
+        if (response.tablet_schemas_size() > 0) {
+            ss << ", tablet schema num: " << response.tablet_schemas_size();
+            std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex);
+            for (const auto& schema : response.tablet_schemas()) {
+                auto tablet_schema = std::make_unique<TabletSchema>();
+                tablet_schema->init_from_pb(schema.tablet_schema());
+                stub->_tablet_schema_for_index->emplace(schema.index_id(),
+                                                        
std::move(tablet_schema));
+                stub->_enable_unique_mow_for_index->emplace(
+                        schema.index_id(), 
schema.enable_unique_key_merge_on_write());
             }
+            stub->_schema_cv.notify_all();
         }
         ss << ", status: " << st;
         LOG(INFO) << ss.str();
@@ -83,21 +101,27 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
                              << status;
             }
         }
-
-        if (response.tablet_schemas_size() > 0) {
-            std::vector<PTabletSchemaWithIndex> 
schemas(response.tablet_schemas().begin(),
-                                                        
response.tablet_schemas().end());
-            _stub->add_schema(schemas);
-        }
     }
     return 0;
 }
 
-void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
-    LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
-    std::lock_guard<bthread::Mutex> lock(_mutex);
-    _is_closed.store(true);
-    _close_cv.notify_all();
+void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+    Defer defer {[this]() { delete this; }};
+    LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id;
+    auto stub = _stub.lock();
+    if (!stub) {
+        LOG(WARNING) << "stub is not exist when on_closed, " << *this;
+        return;
+    }
+    std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
+    stub->_is_closed.store(true);
+    stub->_close_cv.notify_all();
+}
+
+inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler) {
+    ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id)
+         << ", dst_id=" << handler._dst_id;
+    return ostr;
 }
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
@@ -114,39 +138,31 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
           _tablet_schema_for_index(stub._tablet_schema_for_index),
           _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
 
-LoadStreamStub::~LoadStreamStub() = default;
-
-Status LoadStreamStub::close_stream() {
-    if (_is_init.load() && !_handler.is_closed()) {
-        LOG(INFO) << "closing stream, load_id=" << print_id(_load_id) << ", 
src_id=" << _src_id
-                  << ", dst_id=" << _dst_id << ", stream_id=" << _stream_id;
+LoadStreamStub::~LoadStreamStub() {
+    if (_is_init.load() && !_is_closed.load()) {
         auto ret = brpc::StreamClose(_stream_id);
-        if (ret != 0) {
-            return Status::InternalError("StreamClose failed, err={}", ret);
-        }
+        LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? 
"success" : "failed");
     }
-    return Status::OK();
 }
 
 // open_load_stream
-Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
+Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
+                            BrpcClientCache<PBackendService_Stub>* 
client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
                             bool enable_profile) {
-    std::unique_lock<bthread::Mutex> lock(_mutex);
+    std::unique_lock<bthread::Mutex> lock(_open_mutex);
     if (_is_init.load()) {
         return Status::OK();
     }
     _dst_id = node_info.id;
-    _handler.set_dst_id(_dst_id);
-    _handler.set_load_id(_load_id);
     std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
     brpc::StreamOptions opt;
     opt.max_buf_size = config::load_stream_max_buf_size;
     opt.idle_timeout_ms = config::load_stream_idle_timeout_ms;
     opt.messages_in_batch = config::load_stream_messages_in_batch;
-    opt.handler = &_handler;
+    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
     brpc::Controller cntl;
     if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
         return Status::Error<true>(ret, "Failed to create stream");
@@ -178,8 +194,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
         return Status::InternalError("Failed to connect to backend {}: {}", 
_dst_id,
                                      cntl.ErrorText());
     }
-    LOG(INFO) << "open load stream " << _stream_id << " load_id=" << 
print_id(_load_id)
-              << " for backend " << _dst_id << " (" << host_port << ")";
+    LOG(INFO) << "open load stream to " << host_port << ", " << *this;
     _is_init.store(true);
     return Status::OK();
 }
@@ -263,18 +278,6 @@ Status LoadStreamStub::get_schema(const 
std::vector<PTabletID>& tablets) {
     return _encode_and_send(header);
 }
 
-void LoadStreamStub::add_schema(const std::vector<PTabletSchemaWithIndex>& 
schemas) {
-    std::lock_guard<bthread::Mutex> lock(_mutex);
-    for (const auto& schema : schemas) {
-        auto tablet_schema = std::make_unique<TabletSchema>();
-        tablet_schema->init_from_pb(schema.tablet_schema());
-        _tablet_schema_for_index->emplace(schema.index_id(), 
std::move(tablet_schema));
-        _enable_unique_mow_for_index->emplace(schema.index_id(),
-                                              
schema.enable_unique_key_merge_on_write());
-    }
-    _schema_cv.notify_all();
-}
-
 Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                        int64_t timeout_ms) {
     if (_tablet_schema_for_index->contains(index_id)) {
@@ -299,6 +302,36 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
     return Status::OK();
 }
 
+Status LoadStreamStub::close_wait(int64_t timeout_ms) {
+    DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
+        while (true) {
+        };
+    });
+    if (!_is_init.load() || _is_closed.load()) {
+        return Status::OK();
+    }
+    if (timeout_ms <= 0) {
+        timeout_ms = config::close_load_stream_timeout_ms;
+    }
+    DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
+    std::unique_lock<bthread::Mutex> lock(_close_mutex);
+    if (_is_closed.load()) {
+        return Status::OK();
+    }
+    int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+    if (ret != 0) {
+        return Status::InternalError(
+                "stream close_wait timeout, error={}, load_id={}, dst_id={}, 
stream_id={}", ret,
+                print_id(_load_id), _dst_id, _stream_id);
+    }
+    if (!_is_eos.load()) {
+        return Status::InternalError(
+                "stream closed without eos, load_id={}, dst_id={}, 
stream_id={}",
+                print_id(_load_id), _dst_id, _stream_id);
+    }
+    return Status::OK();
+}
+
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
     butil::IOBuf buf;
     size_t header_len = header.ByteSizeLong();
@@ -359,4 +392,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) 
{
     }
 }
 
+inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& 
stub) {
+    ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", 
src_id=" << stub._src_id
+         << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id;
+    return ostr;
+}
+
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index f7d6844fe6b..81ec99fa451 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -84,70 +84,28 @@ using IndexToEnableMoW =
                                       std::allocator<phmap::Pair<const 
int64_t, bool>>, 4,
                                       std::mutex>;
 
-class LoadStreamStub {
-private:
-    class LoadStreamReplyHandler : public brpc::StreamInputHandler {
-    public:
-        LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
-
-        int on_received_messages(brpc::StreamId id, butil::IOBuf* const 
messages[],
-                                 size_t size) override;
-
-        void on_idle_timeout(brpc::StreamId id) override {}
-
-        void on_closed(brpc::StreamId id) override;
-
-        bool is_closed() { return _is_closed.load(); }
-
-        bool is_eos() { return _is_eos.load(); }
-
-        Status close_wait(int64_t timeout_ms) {
-            DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
-            std::unique_lock<bthread::Mutex> lock(_mutex);
-            if (_is_closed) {
-                return Status::OK();
-            }
-            int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
-            if (ret != 0) {
-                return Status::InternalError(
-                        "stream close_wait timeout, load_id={}, be_id={}, 
error={}",
-                        _load_id.to_string(), _dst_id, ret);
-            }
-            if (!_is_eos.load()) {
-                return Status::InternalError("stream closed without eos, 
load_id={} be_id={}",
-                                             _load_id.to_string(), _dst_id);
-            }
-            return Status::OK();
-        };
-
-        std::vector<int64_t> success_tablets() {
-            std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
-            return _success_tablets;
-        }
+class LoadStreamReplyHandler : public brpc::StreamInputHandler {
+public:
+    LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, 
std::weak_ptr<LoadStreamStub> stub)
+            : _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
 
-        std::unordered_map<int64_t, Status> failed_tablets() {
-            std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
-            return _failed_tablets;
-        }
+    int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
+                             size_t size) override;
 
-        void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
-        void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); }
+    void on_idle_timeout(brpc::StreamId id) override {}
 
-    private:
-        UniqueId _load_id;    // for logging
-        int64_t _dst_id = -1; // for logging
-        std::atomic<bool> _is_closed;
-        std::atomic<bool> _is_eos;
-        bthread::Mutex _mutex;
-        bthread::ConditionVariable _close_cv;
+    void on_closed(brpc::StreamId id) override;
 
-        bthread::Mutex _success_tablets_mutex;
-        bthread::Mutex _failed_tablets_mutex;
-        std::vector<int64_t> _success_tablets;
-        std::unordered_map<int64_t, Status> _failed_tablets;
+    friend std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler);
 
-        LoadStreamStub* _stub = nullptr;
-    };
+private:
+    PUniqueId _load_id;   // for logging
+    int64_t _dst_id = -1; // for logging
+    std::weak_ptr<LoadStreamStub> _stub;
+};
+
+class LoadStreamStub {
+    friend class LoadStreamReplyHandler;
 
 public:
     // construct new stub
@@ -163,7 +121,8 @@ public:
             ~LoadStreamStub();
 
     // open_load_stream
-    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
+    Status open(std::shared_ptr<LoadStreamStub> self,
+                BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
                 const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
                 bool enable_profile);
@@ -189,31 +148,16 @@ public:
     // GET_SCHEMA
     Status get_schema(const std::vector<PTabletID>& tablets);
 
-    // close stream, usually close is initiated by the remote.
-    // in case of remote failure, we should be able to close stream locally.
-    Status close_stream();
-
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
-    Status close_wait(int64_t timeout_ms = 0) {
-        DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
-            while (true) {
-            };
-        });
-        if (!_is_init.load() || _handler.is_closed()) {
-            return Status::OK();
-        }
-        if (timeout_ms <= 0) {
-            timeout_ms = config::close_load_stream_timeout_ms;
-        }
-        return _handler.close_wait(timeout_ms);
-    }
+    // if timeout_ms <= 0, will fallback to 
config::close_load_stream_timeout_ms
+    Status close_wait(int64_t timeout_ms = 0);
 
     Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
                            int64_t timeout_ms = 60000);
 
     Status wait_for_new_schema(int64_t timeout_ms) {
-        std::unique_lock<bthread::Mutex> lock(_mutex);
+        std::unique_lock<bthread::Mutex> lock(_schema_mutex);
         if (timeout_ms > 0) {
             int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
             return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait 
schema update timeout");
@@ -222,8 +166,6 @@ public:
         return Status::OK();
     };
 
-    void add_schema(const std::vector<PTabletSchemaWithIndex>& schemas);
-
     std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
         return (*_tablet_schema_for_index)[index_id];
     }
@@ -232,9 +174,15 @@ public:
         return _enable_unique_mow_for_index->at(index_id);
     }
 
-    std::vector<int64_t> success_tablets() { return 
_handler.success_tablets(); }
+    std::vector<int64_t> success_tablets() {
+        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+        return _success_tablets;
+    }
 
-    std::unordered_map<int64_t, Status> failed_tablets() { return 
_handler.failed_tablets(); }
+    std::unordered_map<int64_t, Status> failed_tablets() {
+        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+        return _failed_tablets;
+    }
 
     brpc::StreamId stream_id() const { return _stream_id; }
 
@@ -242,6 +190,8 @@ public:
 
     int64_t dst_id() const { return _dst_id; }
 
+    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& 
stub);
+
 private:
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
@@ -249,10 +199,19 @@ private:
 
 protected:
     std::atomic<bool> _is_init;
-    bthread::Mutex _mutex;
-
+    std::atomic<bool> _is_closed;
+    std::atomic<bool> _is_eos;
     std::atomic<int> _use_cnt;
 
+    PUniqueId _load_id;
+    brpc::StreamId _stream_id;
+    int64_t _src_id = -1; // source backend_id
+    int64_t _dst_id = -1; // destination backend_id
+
+    bthread::Mutex _open_mutex;
+    bthread::Mutex _close_mutex;
+    bthread::ConditionVariable _close_cv;
+
     std::mutex _tablets_to_commit_mutex;
     std::vector<PTabletID> _tablets_to_commit;
 
@@ -260,15 +219,15 @@ protected:
     std::mutex _send_mutex;
     butil::IOBuf _buffer;
 
-    PUniqueId _load_id;
-    brpc::StreamId _stream_id;
-    int64_t _src_id = -1; // source backend_id
-    int64_t _dst_id = -1; // destination backend_id
-    LoadStreamReplyHandler _handler {this};
-
+    bthread::Mutex _schema_mutex;
     bthread::ConditionVariable _schema_cv;
     std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+
+    bthread::Mutex _success_tablets_mutex;
+    bthread::Mutex _failed_tablets_mutex;
+    std::vector<int64_t> _success_tablets;
+    std::unordered_map<int64_t, Status> _failed_tablets;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 3cf168a131e..1baa903f2ee 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -31,24 +31,6 @@ void LoadStreams::release(Status status) {
     DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
         LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
-        for (auto& stream : _streams) {
-            auto st = stream->close_stream();
-            DBUG_EXECUTE_IF("LoadStreams.release.close_stream_failed",
-                            { st = Status::InternalError("stream close 
failed"); });
-            if (!st.ok()) {
-                LOG(WARNING) << "close stream failed " << st;
-            }
-        }
-        if (status.ok()) {
-            for (auto& stream : _streams) {
-                auto st = stream->close_wait();
-                DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
-                                { st = Status::InternalError("stream close 
wait timeout"); });
-                if (!st.ok()) {
-                    LOG(WARNING) << "close wait failed " << st;
-                }
-            }
-        }
         _pool->erase(_load_id, _dst_id);
     } else {
         LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << 
_dst_id
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 0ac06eaa2ca..1f1ed6591d2 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -278,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, LoadStreams& st
     // get tablet schema from each backend only in the 1st stream
     for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
-                                     _txn_id, *_schema, tablets_for_schema, 
_total_streams,
-                                     _state->enable_profile()));
+        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
+                                     *node_info, _txn_id, *_schema, 
tablets_for_schema,
+                                     _total_streams, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
     for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
-                                     _txn_id, *_schema, {}, _total_streams,
+        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
+                                     *node_info, _txn_id, *_schema, {}, 
_total_streams,
                                      _state->enable_profile()));
     }
     return Status::OK();
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
index 0f75cb12a62..ea790577449 100644
--- 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
@@ -89,12 +89,8 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
     load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", 
"StreamWrite failed, err=32")
     // LoadStreams keeping stream when release
     load_with_injection("LoadStreams.release.keeping_streams", "")
-    // LoadStreams close stream failed
-    load_with_injection("LoadStreams.release.close_stream_failed", "")
-    // LoadStreams close wait failed
-    load_with_injection("LoadStreams.release.close_wait_failed", "")
 
     sql """ DROP TABLE IF EXISTS `baseall` """
     sql """ DROP TABLE IF EXISTS `test` """
     sql """ set enable_memtable_on_sink_node=false """
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to