This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ea08fa856 [fix](spill) fix memory orphan check failure of 
partitioned hash join (#36806)
10ea08fa856 is described below

commit 10ea08fa85624c2f6c4de62c33c2aa1e24c0c550
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Jun 26 17:08:57 2024 +0800

    [fix](spill) fix memory orphan check failure of partitioned hash join 
(#36806)
    
    Also add fault injection regression test cases for spill.
---
 .../exec/partitioned_hash_join_sink_operator.cpp   |  22 ++-
 .../spill/partitioned_agg_fault_injection.groovy   | 149 ++++++++++++++
 .../partitioned_hash_join_fault_injection.groovy   | 216 +++++++++++++++++++++
 .../spill/spill_sort_fault_injection.groovy        | 158 +++++++++++++++
 4 files changed, 535 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 81bc2253657..cb104cfc7cd 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -126,13 +126,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             _shared_state->shared_from_this();
     auto query_id = state->query_id();
     auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-    auto spill_func = [build_blocks = std::move(build_blocks), state, 
num_slots, this]() mutable {
-        Defer defer {[&]() {
-            // need to reset build_block here, or else build_block will be 
destructed
-            // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
-            build_blocks.clear();
-        }};
-
+    auto spill_func = [state, num_slots,
+                       this](std::vector<vectorized::Block>& build_blocks) 
mutable {
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
         std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
@@ -216,9 +211,16 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         _dependency->set_ready();
     };
 
-    auto exception_catch_func = [spill_func, shared_state_holder, 
execution_context, state,
-                                 query_id, mem_tracker, this]() mutable {
+    auto exception_catch_func = [build_blocks = std::move(build_blocks), 
spill_func,
+                                 shared_state_holder, execution_context, 
state, query_id,
+                                 mem_tracker, this]() mutable {
         SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        Defer defer {[&]() {
+            // need to reset build_block here, or else build_block will be 
destructed
+            // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
+            build_blocks.clear();
+        }};
+
         std::shared_ptr<TaskExecutionContext> execution_context_lock;
         auto shared_state_sptr = shared_state_holder.lock();
         if (shared_state_sptr) {
@@ -230,7 +232,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         }
 
         auto status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION(spill_func());
+            RETURN_IF_CATCH_EXCEPTION(spill_func(build_blocks));
             return Status::OK();
         }();
 
diff --git 
a/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy
new file mode 100644
index 00000000000..0cefbba0657
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("partitioned_agg_fault_injection", "nonConcurrent") {
+    multi_sql """
+    use regression_test_tpch_unique_sql_zstd_p0;
+    set enable_force_spill=true;
+    set min_revocable_mem=1024;
+    """
+    def test_sql = """
+    select
+    l_orderkey,
+    l_suppkey,
+    l_partkey,
+    l_quantity,
+    l_extendedprice,
+    l_discount,
+    l_returnflag,
+    l_linestatus,
+    l_commitdate,
+    l_receiptdate,
+    l_shipinstruct,
+    l_shipmode,
+    sum(l_tax)
+from
+    lineitem
+group by
+    l_orderkey,
+    l_suppkey,
+    l_partkey,
+    l_quantity,
+    l_extendedprice,
+    l_discount,
+    l_returnflag,
+    l_linestatus,
+    l_commitdate,
+    l_receiptdate,
+    l_shipinstruct,
+    l_shipmode
+    """
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
prepare_spill failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_eof failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
read_next_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::sink")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink 
sink failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::sink")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink 
revoke_memory submit_func failed"));
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink 
revoke_memory canceled"));
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::recover_spill_data")
+        sql test_sql
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_agg_source recover_spill_data failed"));
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::recover_spill_data")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::merge_spill_data_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_agg_source merge spill data canceled"));
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::merge_spill_data_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_agg_source submit_func failed"));
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::submit_func")
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy
new file mode 100644
index 00000000000..72e5179c46e
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("partitioned_hash_join_fault_injection", "nonConcurrent") {
+    multi_sql """
+    use regression_test_tpch_unique_sql_zstd_p0;
+    set enable_force_spill=true;
+    set min_revocable_mem=1024;
+    """
+    def test_sql = """
+    SELECT
+      L_ORDERKEY,
+      L_COMMENT,
+      O_ORDERKEY,
+      L_QUANTITY,
+      L_SHIPINSTRUCT
+    FROM
+        lineitem, 
+        orders
+    WHERE
+      L_ORDERKEY = o_orderkey
+    """
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
prepare_spill failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_eof failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
read_next_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_sink revoke_unpartitioned_block submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_sink revoke_memory submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_sink revoke_memory canceled"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::sink")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_sink sink failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::sink")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe spill_probe_blocks failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe spill_probe_blocks canceled"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe spill_probe_blocks submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recover_build_blocks failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recover_build_blocks canceled"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recovery_build_blocks submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recover_probe_blocks failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recover_probe_blocks canceled"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe recovery_probe_blocks submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::sink")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject 
partitioned_hash_join_probe sink failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::sink")
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy
new file mode 100644
index 00000000000..ce9d9ee9dd7
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("spill_sort_fault_injection", "nonConcurrent") {
+    multi_sql """
+    use regression_test_tpch_unique_sql_zstd_p0;
+    set enable_force_spill=true;
+    set min_revocable_mem=1024;
+    """
+    def test_sql = """
+    select
+    l_orderkey,
+    l_linenumber,
+    l_partkey,
+    l_suppkey,
+    l_quantity,
+    l_extendedprice,
+    l_discount,
+    l_tax,
+    l_returnflag,
+    l_linestatus,
+    l_commitdate,
+    l_receiptdate,
+    l_shipinstruct,
+    l_shipmode,
+    l_shipdate
+from
+    lineitem
+order by
+    l_orderkey,
+    l_linenumber,
+    l_partkey,
+    l_suppkey,
+    l_quantity,
+    l_extendedprice,
+    l_discount,
+    l_tax,
+    l_returnflag,
+    l_linestatus,
+    l_commitdate,
+    l_receiptdate,
+    l_shipinstruct,
+    l_shipmode;
+    """
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
prepare_spill failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
spill_eof failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_stream 
read_next_block failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::sink")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_sink sink 
failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::sink")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_cancel")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_sink 
revoke_memory canceled"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_cancel")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_submit_func")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_sink 
revoke_memory submit_func failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_submit_func")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::recover_spill_data")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_source 
recover_spill_data failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::recover_spill_data")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_source 
spill_merged_data failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data")
+        sql test_sql
+    } catch(Exception e) {
+        log.error(e.getMessage())
+        assertTrue(e.getMessage().contains("fault_inject spill_sort_source 
spill_merged_data failed"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to