This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 10ea08fa856 [fix](spill) fix memory orphan check failure of partitioned hash join (#36806) 10ea08fa856 is described below commit 10ea08fa85624c2f6c4de62c33c2aa1e24c0c550 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Wed Jun 26 17:08:57 2024 +0800 [fix](spill) fix memory orphan check failure of partitioned hash join (#36806) Also add fault injection regression test cases for spill. --- .../exec/partitioned_hash_join_sink_operator.cpp | 22 ++- .../spill/partitioned_agg_fault_injection.groovy | 149 ++++++++++++++ .../partitioned_hash_join_fault_injection.groovy | 216 +++++++++++++++++++++ .../spill/spill_sort_fault_injection.groovy | 158 +++++++++++++++ 4 files changed, 535 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 81bc2253657..cb104cfc7cd 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -126,13 +126,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _shared_state->shared_from_this(); auto query_id = state->query_id(); auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable { - Defer defer {[&]() { - // need to reset build_block here, or else build_block will be destructed - // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure - build_blocks.clear(); - }}; - + auto spill_func = [state, num_slots, + this](std::vector<vectorized::Block>& build_blocks) mutable { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count); @@ -216,9 +211,16 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); }; - auto exception_catch_func = [spill_func, shared_state_holder, execution_context, state, - query_id, mem_tracker, this]() mutable { + auto exception_catch_func = [build_blocks = std::move(build_blocks), spill_func, + shared_state_holder, execution_context, state, query_id, + mem_tracker, this]() mutable { SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + Defer defer {[&]() { + // need to reset build_block here, or else build_block will be destructed + // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure + build_blocks.clear(); + }}; + std::shared_ptr<TaskExecutionContext> execution_context_lock; auto shared_state_sptr = shared_state_holder.lock(); if (shared_state_sptr) { @@ -230,7 +232,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(spill_func()); + RETURN_IF_CATCH_EXCEPTION(spill_func(build_blocks)); return Status::OK(); }(); diff --git a/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy b/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy new file mode 100644 index 00000000000..0cefbba0657 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/spill/partitioned_agg_fault_injection.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("partitioned_agg_fault_injection", "nonConcurrent") { + multi_sql """ + use regression_test_tpch_unique_sql_zstd_p0; + set enable_force_spill=true; + set min_revocable_mem=1024; + """ + def test_sql = """ + select + l_orderkey, + l_suppkey, + l_partkey, + l_quantity, + l_extendedprice, + l_discount, + l_returnflag, + l_linestatus, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode, + sum(l_tax) +from + lineitem +group by + l_orderkey, + l_suppkey, + l_partkey, + l_quantity, + l_extendedprice, + l_discount, + l_returnflag, + l_linestatus, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode + """ + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream prepare_spill failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_eof failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream read_next_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::sink") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink sink failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::sink") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_submit_func") + sql test_sql + } catch(Exception e) { + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink revoke_memory submit_func failed")); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_cancel") + sql test_sql + } catch(Exception e) { + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_sink revoke_memory canceled")); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_sink::revoke_memory_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::recover_spill_data") + sql test_sql + } catch(Exception e) { + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_source recover_spill_data failed")); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::recover_spill_data") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::merge_spill_data_cancel") + sql test_sql + } catch(Exception e) { + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_source merge spill data canceled")); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::merge_spill_data_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_agg_source::submit_func") + sql test_sql + } catch(Exception e) { + assertTrue(e.getMessage().contains("fault_inject partitioned_agg_source submit_func failed")); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_agg_source::submit_func") + } +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy b/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy new file mode 100644 index 00000000000..72e5179c46e --- /dev/null +++ b/regression-test/suites/fault_injection_p0/spill/partitioned_hash_join_fault_injection.groovy @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("partitioned_hash_join_fault_injection", "nonConcurrent") { + multi_sql """ + use regression_test_tpch_unique_sql_zstd_p0; + set enable_force_spill=true; + set min_revocable_mem=1024; + """ + def test_sql = """ + SELECT + L_ORDERKEY, + L_COMMENT, + O_ORDERKEY, + L_QUANTITY, + L_SHIPINSTRUCT + FROM + lineitem, + orders + WHERE + L_ORDERKEY = o_orderkey + """ + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream prepare_spill failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_eof failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream read_next_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_sink revoke_unpartitioned_block submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_unpartitioned_block_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_sink revoke_memory submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_sink revoke_memory canceled")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::sink") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_sink sink failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_sink::sink") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe spill_probe_blocks failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe spill_probe_blocks canceled")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recover_build_blocks failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recover_build_blocks canceled")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recovery_build_blocks submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recover_probe_blocks failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recover_probe_blocks canceled")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe recovery_probe_blocks submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::recovery_probe_blocks_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::sink") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject partitioned_hash_join_probe sink failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::partitioned_hash_join_probe::sink") + } +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy b/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy new file mode 100644 index 00000000000..ce9d9ee9dd7 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/spill/spill_sort_fault_injection.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("spill_sort_fault_injection", "nonConcurrent") { + multi_sql """ + use regression_test_tpch_unique_sql_zstd_p0; + set enable_force_spill=true; + set min_revocable_mem=1024; + """ + def test_sql = """ + select + l_orderkey, + l_linenumber, + l_partkey, + l_suppkey, + l_quantity, + l_extendedprice, + l_discount, + l_tax, + l_returnflag, + l_linestatus, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode, + l_shipdate +from + lineitem +order by + l_orderkey, + l_linenumber, + l_partkey, + l_suppkey, + l_quantity, + l_extendedprice, + l_discount, + l_tax, + l_returnflag, + l_linestatus, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode; + """ + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream prepare_spill failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::prepare_spill") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream spill_eof failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::spill_eof") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_stream read_next_block failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_stream::read_next_block") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::sink") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_sink sink failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::sink") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_cancel") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_sink revoke_memory canceled")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_cancel") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_submit_func") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_sink revoke_memory submit_func failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_sink::revoke_memory_submit_func") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::recover_spill_data") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_source recover_spill_data failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::recover_spill_data") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_source spill_merged_data failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data") + sql test_sql + } catch(Exception e) { + log.error(e.getMessage()) + assertTrue(e.getMessage().contains("fault_inject spill_sort_source spill_merged_data failed")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::spill_sort_source::spill_merged_data") + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org