This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 20f2abb3d4 [vectorized](pipeline) support assert num rows operator (#14923) 20f2abb3d4 is described below commit 20f2abb3d4ce5cd1a013eef3c8dec6835a757a4b Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Fri Dec 9 09:39:29 2022 +0800 [vectorized](pipeline) support assert num rows operator (#14923) --- .../exec/assert_num_rows_operator.h} | 40 ++++++++++++---------- be/src/pipeline/pipeline_fragment_context.cpp | 9 +++++ be/src/vec/exec/vassert_num_rows_node.cpp | 19 +++++----- be/src/vec/exec/vassert_num_rows_node.h | 6 ++-- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/pipeline/exec/assert_num_rows_operator.h similarity index 50% copy from be/src/vec/exec/vassert_num_rows_node.h copy to be/src/pipeline/exec/assert_num_rows_operator.h index cce963f25f..60bbc7c49c 100644 --- a/be/src/vec/exec/vassert_num_rows_node.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -15,28 +15,32 @@ // specific language governing permissions and limitations // under the License. -#include "exec/exec_node.h" -#include "gen_cpp/PlanNodes_types.h" +#pragma once -namespace doris::vectorized { -class Block; +#include "operator.h" +#include "vec/exec/vassert_num_rows_node.h" -// Node for assert row count -class VAssertNumRowsNode : public ExecNode { -public: - VAssertNumRowsNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); +namespace doris { + +namespace pipeline { - Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { - return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next."); - } +class AssertNumRowsOperatorBuilder final : public OperatorBuilder<vectorized::VAssertNumRowsNode> { +public: + AssertNumRowsOperatorBuilder(int32_t id, ExecNode* node) + : OperatorBuilder(id, "AssertNumRowsOperatorBuilder", node) {}; - virtual Status open(RuntimeState* state) override; - virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; + OperatorPtr build_operator() override; +}; -private: - int64_t _desired_num_rows; - const std::string _subquery_string; - TAssertion::type _assertion; +class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> { +public: + AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node) + : Operator(operator_builder, node) {}; }; -} // namespace doris::vectorized +OperatorPtr AssertNumRowsOperatorBuilder::build_operator() { + return std::make_shared<AssertNumRowsOperator>(this, _node); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 68ca3a8d7c..2f497dde41 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -39,7 +39,9 @@ #include "exec/streaming_aggregation_source_operator.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" +#include "pipeline/exec/assert_num_rows_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" +#include "pipeline/exec/operator.h" #include "pipeline/exec/table_function_operator.h" #include "pipeline_task.h" #include "runtime/client_cache.h" @@ -351,6 +353,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } + case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { + RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); + OperatorBuilderPtr builder = + std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), node); + RETURN_IF_ERROR(cur_pipe->add_operator(builder)); + break; + } case TPlanNodeType::TABLE_FUNCTION_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr builder = diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp index b9239a3c53..0285acf56a 100644 --- a/be/src/vec/exec/vassert_num_rows_node.cpp +++ b/be/src/vec/exec/vassert_num_rows_node.cpp @@ -18,8 +18,6 @@ #include "vec/exec/vassert_num_rows_node.h" #include "gen_cpp/PlanNodes_types.h" -#include "gutil/strings/substitute.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -47,12 +45,7 @@ Status VAssertNumRowsNode::open(RuntimeState* state) { return Status::OK(); } -Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) { - INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, - "VAssertNumRowsNode::get_next"); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos), - child(0)->get_next_span(), *eos); +Status VAssertNumRowsNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) { _num_rows_returned += block->rows(); bool assert_res = false; switch (_assertion) { @@ -98,4 +91,14 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos return Status::OK(); } +Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) { + INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, + "VAssertNumRowsNode::get_next"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos), + child(0)->get_next_span(), *eos); + + return pull(state, block, eos); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/vec/exec/vassert_num_rows_node.h index cce963f25f..0f6ffcb9de 100644 --- a/be/src/vec/exec/vassert_num_rows_node.h +++ b/be/src/vec/exec/vassert_num_rows_node.h @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "common/status.h" #include "exec/exec_node.h" #include "gen_cpp/PlanNodes_types.h" @@ -30,8 +31,9 @@ public: return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next."); } - virtual Status open(RuntimeState* state) override; - virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; + Status open(RuntimeState* state) override; + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; private: int64_t _desired_num_rows; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org