This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 873b128fde [feature](pipeline) add inersect/except operators (#14868) 873b128fde is described below commit 873b128fde04e45257dc73dd192475c5c6e37109 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Dec 9 14:13:48 2022 +0800 [feature](pipeline) add inersect/except operators (#14868) --- be/src/exec/exec_node.cpp | 3 +- be/src/exec/exec_node.h | 2 + be/src/pipeline/CMakeLists.txt | 6 +- be/src/pipeline/exec/operator.h | 4 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 63 ++++ be/src/pipeline/exec/set_probe_sink_operator.h | 67 ++++ be/src/pipeline/exec/set_sink_operator.cpp | 43 +++ be/src/pipeline/exec/set_sink_operator.h | 62 ++++ be/src/pipeline/exec/set_source_operator.cpp | 44 +++ be/src/pipeline/exec/set_source_operator.h | 57 +++ be/src/pipeline/pipeline_fragment_context.cpp | 35 ++ be/src/pipeline/pipeline_fragment_context.h | 8 + be/src/pipeline/pipeline_task.cpp | 1 - be/src/vec/CMakeLists.txt | 2 - be/src/vec/exec/vexcept_node.cpp | 114 ------ be/src/vec/exec/vexcept_node.h | 40 -- be/src/vec/exec/vintersect_node.cpp | 115 ------ be/src/vec/exec/vintersect_node.h | 50 --- be/src/vec/exec/vset_operation_node.cpp | 445 +++++++++++++++++++---- be/src/vec/exec/vset_operation_node.h | 243 ++----------- 20 files changed, 793 insertions(+), 611 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index cfbc9a50ca..5849b455cb 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -72,13 +72,12 @@ #include "vec/exec/vbroker_scan_node.h" #include "vec/exec/vdata_gen_scan_node.h" #include "vec/exec/vempty_set_node.h" -#include "vec/exec/vexcept_node.h" #include "vec/exec/vexchange_node.h" -#include "vec/exec/vintersect_node.h" #include "vec/exec/vmysql_scan_node.h" #include "vec/exec/vrepeat_node.h" #include "vec/exec/vschema_scan_node.h" #include "vec/exec/vselect_node.h" +#include "vec/exec/vset_operation_node.h" #include "vec/exec/vsort_node.h" #include "vec/exec/vtable_function_node.h" #include "vec/exec/vunion_node.h" diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index eaf367f6b9..0da7338c80 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -242,6 +242,8 @@ public: ExecNode* child(int i) { return _children[i]; } + size_t children_count() const { return _children.size(); } + protected: friend class DataSink; diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 38b01ddd20..659b357870 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -45,8 +45,10 @@ set(PIPELINE_FILES exec/sort_sink_operator.cpp exec/repeat_operator.cpp exec/table_function_operator.cpp - ) + exec/set_sink_operator.cpp + exec/set_source_operator.cpp + exec/set_probe_sink_operator.cpp) add_library(Pipeline STATIC ${PIPELINE_FILES} - ) \ No newline at end of file + ) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index f1e29f0547..a17e8dd1f4 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -271,9 +271,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (!UNLIKELY(in_block)) { - DCHECK(source_state == SourceState::FINISHED) - << "block is null, eos should invoke in finalize."; + if (UNLIKELY(!in_block || in_block->rows() == 0)) { return Status::OK(); } return _sink->send(state, in_block, source_state == SourceState::FINISHED); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp new file mode 100644 index 0000000000..2f51edd12a --- /dev/null +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "set_probe_sink_operator.h" + +#include "vec/exec/vset_operation_node.h" + +namespace doris::pipeline { + +template <bool is_intersect> +SetProbeSinkOperatorBuilder<is_intersect>::SetProbeSinkOperatorBuilder(int32_t id, int child_id, + ExecNode* set_node) + : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, builder_name, set_node), + _child_id(child_id) {} + +template <bool is_intersect> +OperatorPtr SetProbeSinkOperatorBuilder<is_intersect>::build_operator() { + return std::make_shared<SetProbeSinkOperator<is_intersect>>(this, _child_id, this->_node); +} + +template <bool is_intersect> +SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase* operator_builder, + int child_id, ExecNode* set_node) + : Operator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node), + _child_id(child_id) {} + +template <bool is_intersect> +Status SetProbeSinkOperator<is_intersect>::sink(RuntimeState* state, vectorized::Block* block, + SourceState source_state) { + return this->_node->sink_probe(state, _child_id, block, source_state == SourceState::FINISHED); +} + +template <bool is_intersect> +Status SetProbeSinkOperator<is_intersect>::finalize(RuntimeState* state) { + return this->_node->finalize_probe(state, _child_id); +} + +template <bool is_intersect> +bool SetProbeSinkOperator<is_intersect>::can_write() { + DCHECK_GT(_child_id, 0); + return this->_node->is_child_finished(_child_id - 1); +} + +template class SetProbeSinkOperatorBuilder<true>; +template class SetProbeSinkOperatorBuilder<false>; +template class SetProbeSinkOperator<true>; +template class SetProbeSinkOperator<false>; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h new file mode 100644 index 0000000000..c4dc263803 --- /dev/null +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <utility> + +#include "operator.h" + +namespace doris { + +namespace vectorized { +template <bool is_intersect> +class VSetOperationNode; +} + +namespace pipeline { + +template <bool is_intersect> +class SetProbeSinkOperatorBuilder final + : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> { +private: + constexpr static auto builder_name = + is_intersect ? "IntersectProbeSinkOperatorBuilder" : "ExceptProbeSinkOperatorBuilder"; + +public: + SetProbeSinkOperatorBuilder(int32_t id, int child_id, ExecNode* set_node); + bool is_sink() const override { return true; } + + OperatorPtr build_operator() override; + +private: + int _child_id; +}; + +template <bool is_intersect> +class SetProbeSinkOperator : public Operator<SetProbeSinkOperatorBuilder<is_intersect>> { +public: + SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id, ExecNode* set_node); + + bool can_write() override; + + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; + Status finalize(RuntimeState* state) override; + Status open(RuntimeState* /*state*/) override { return Status::OK(); } + Status close(RuntimeState* /*state*/) override { return Status::OK(); } + +private: + int _child_id; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp new file mode 100644 index 0000000000..aaa85c31f1 --- /dev/null +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "set_sink_operator.h" + +#include "vec/exec/vset_operation_node.h" + +namespace doris::pipeline { + +template <bool is_intersect> +SetSinkOperatorBuilder<is_intersect>::SetSinkOperatorBuilder(int32_t id, ExecNode* set_node) + : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, builder_name, set_node) { +} + +template <bool is_intersect> +OperatorPtr SetSinkOperatorBuilder<is_intersect>::build_operator() { + return std::make_shared<SetSinkOperator<is_intersect>>(this, this->_node); +} + +template <bool is_intersect> +SetSinkOperator<is_intersect>::SetSinkOperator( + OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node) + : Operator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {} + +template class SetSinkOperatorBuilder<true>; +template class SetSinkOperatorBuilder<false>; +template class SetSinkOperator<true>; +template class SetSinkOperator<false>; +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h new file mode 100644 index 0000000000..eb6e5087fb --- /dev/null +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <utility> + +#include "operator.h" + +namespace doris { + +namespace vectorized { +template <bool is_intersect> +class VSetOperationNode; +} + +namespace pipeline { + +template <bool is_intersect> +class SetSinkOperatorBuilder final + : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> { +private: + constexpr static auto builder_name = + is_intersect ? "IntersectSinkOperatorBuilder" : "ExceptSinkOperatorBuilder"; + +public: + SetSinkOperatorBuilder(int32_t id, ExecNode* set_node); + bool is_sink() const override { return true; } + + OperatorPtr build_operator() override; +}; + +template <bool is_intersect> +class SetSinkOperator : public Operator<SetSinkOperatorBuilder<is_intersect>> { +public: + SetSinkOperator(OperatorBuilderBase* operator_builder, + vectorized::VSetOperationNode<is_intersect>* set_node); + + bool can_write() override { return true; } + + Status close(RuntimeState* /*state*/) override { return Status::OK(); }; + +private: + vectorized::VSetOperationNode<is_intersect>* _set_node; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp new file mode 100644 index 0000000000..bb14936e83 --- /dev/null +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "set_source_operator.h" + +#include "vec/exec/vset_operation_node.h" + +namespace doris::pipeline { + +template <bool is_intersect> +SetSourceOperatorBuilder<is_intersect>::SetSourceOperatorBuilder(int32_t id, ExecNode* set_node) + : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, builder_name, set_node) { +} + +template <bool is_intersect> +OperatorPtr SetSourceOperatorBuilder<is_intersect>::build_operator() { + return std::make_shared<SetSourceOperator<is_intersect>>(this, this->_node); +} + +template <bool is_intersect> +SetSourceOperator<is_intersect>::SetSourceOperator( + OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node) + : Operator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) {} + +template class SetSourceOperatorBuilder<true>; +template class SetSourceOperatorBuilder<false>; +template class SetSourceOperator<true>; +template class SetSourceOperator<false>; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h new file mode 100644 index 0000000000..c870d15fa8 --- /dev/null +++ b/be/src/pipeline/exec/set_source_operator.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <utility> + +#include "operator.h" + +namespace doris { + +namespace vectorized { +template <bool is_intersect> +class VSetOperationNode; +} + +namespace pipeline { + +template <bool is_intersect> +class SetSourceOperatorBuilder + : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> { +private: + constexpr static auto builder_name = + is_intersect ? "IntersectSourceOperatorBuilder" : "ExceptSourceOperatorBuilder"; + +public: + SetSourceOperatorBuilder(int32_t id, ExecNode* set_node); + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; +}; + +template <bool is_intersect> +class SetSourceOperator : public Operator<SetSourceOperatorBuilder<is_intersect>> { +public: + SetSourceOperator(OperatorBuilderBase* builder, + vectorized::VSetOperationNode<is_intersect>* set_node); + + Status open(RuntimeState* /*state*/) override { return Status::OK(); }; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8c5cc7b7bb..f2deb1375a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -33,6 +33,9 @@ #include "exec/result_sink_operator.h" #include "exec/scan_node.h" #include "exec/scan_operator.h" +#include "exec/set_probe_sink_operator.h" +#include "exec/set_sink_operator.h" +#include "exec/set_source_operator.h" #include "exec/sort_sink_operator.h" #include "exec/sort_source_operator.h" #include "exec/streaming_aggregation_sink_operator.h" @@ -56,6 +59,7 @@ #include "vec/exec/vaggregation_node.h" #include "vec/exec/vexchange_node.h" #include "vec/exec/vrepeat_node.h" +#include "vec/exec/vset_operation_node.h" #include "vec/exec/vsort_node.h" #include "vec/sink/vresult_sink.h" @@ -382,6 +386,14 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur cur_pipe->add_dependency(new_pipe); break; } + case TPlanNodeType::INTERSECT_NODE: { + RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(node, cur_pipe)); + break; + } + case TPlanNodeType::EXCEPT_NODE: { + RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node, cur_pipe)); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(node_type)); @@ -389,6 +401,29 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur return Status::OK(); } +template <bool is_intersect> +Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode* node, + PipelinePtr cur_pipe) { + auto build_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline)); + OperatorBuilderPtr sink_builder = std::make_shared<SetSinkOperatorBuilder<is_intersect>>( + next_operator_builder_id(), node); + RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder)); + + for (int child_id = 1; child_id < node->children_count(); ++child_id) { + auto probe_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline)); + OperatorBuilderPtr probe_sink_builder = + std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>( + next_operator_builder_id(), child_id, node); + RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder)); + } + + OperatorBuilderPtr source_builder = std::make_shared<SetSourceOperatorBuilder<is_intersect>>( + next_operator_builder_id(), node); + return cur_pipe->add_operator(source_builder); +} + Status PipelineFragmentContext::submit() { if (_submitted) { return Status::InternalError("submitted"); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index dc88feebf3..fd1d50ebff 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -24,6 +24,11 @@ namespace doris { class ExecNode; class DataSink; +namespace vectorized { +template <bool is_intersect> +class VSetOperationNode; +} + namespace pipeline { class PipelineTask; @@ -121,6 +126,9 @@ private: Status _create_sink(const TDataSink& t_data_sink); Status _build_pipelines(ExecNode*, PipelinePtr); Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& request); + + template <bool is_intersect> + Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index fbe1d6b819..6e867d5959 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -146,7 +146,6 @@ Status PipelineTask::execute(bool* eos) { break; } } - *eos = false; } return Status::OK(); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 8dfd11ade4..ca5c9c17f5 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -94,8 +94,6 @@ set(VEC_FILES exec/vexchange_node.cpp exec/vset_operation_node.cpp exec/vunion_node.cpp - exec/vintersect_node.cpp - exec/vexcept_node.cpp exec/vselect_node.cpp exec/vmysql_scan_node.cpp exec/vschema_scan_node.cpp diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp deleted file mode 100644 index 8cf391f72f..0000000000 --- a/be/src/vec/exec/vexcept_node.cpp +++ /dev/null @@ -1,114 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/vexcept_node.h" - -#include "gen_cpp/PlanNodes_types.h" -#include "runtime/runtime_state.h" -#include "util/runtime_profile.h" -#include "vec/core/block.h" -#include "vec/exec/vset_operation_node.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" -namespace doris { -namespace vectorized { - -VExceptNode::VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : VSetOperationNode(pool, tnode, descs) {} - -Status VExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(VSetOperationNode::init(tnode, state)); - DCHECK(tnode.__isset.except_node); - return Status::OK(); -} - -Status VExceptNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(VSetOperationNode::prepare(state)); - return Status::OK(); -} - -Status VExceptNode::open(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::open"); - RETURN_IF_ERROR(VSetOperationNode::open(state)); - bool eos = false; - Status st = Status::OK(); - for (int i = 1; i < _children.size(); ++i) { - if (i > 1) { - refresh_hash_table<false>(); - } - - RETURN_IF_ERROR(child(i)->open(state)); - eos = false; - int probe_expr_ctxs_sz = _child_expr_lists[i].size(); - _probe_columns.resize(probe_expr_ctxs_sz); - - while (!eos) { - RETURN_IF_ERROR(process_probe_block(state, i, &eos)); - if (_probe_rows == 0) continue; - - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - HashTableProbe<HashTableCtxType, false> process_hashtable_ctx( - this, state->batch_size(), _probe_rows); - st = process_hashtable_ctx.mark_data_in_hashtable(arg); - - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, - *_hash_table_variants); - } - } - return st; -} - -Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { - INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next"); - SCOPED_TIMER(_probe_timer); - Status st; - create_mutable_cols(output_block); - - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - HashTableProbe<HashTableCtxType, false> process_hashtable_ctx( - this, state->batch_size(), _probe_rows); - st = process_hashtable_ctx.get_data_in_hashtable(arg, _mutable_cols, - output_block, eos); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, - *_hash_table_variants); - - RETURN_IF_ERROR( - VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); - reached_limit(output_block, eos); - - return st; -} - -Status VExceptNode::close(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::close"); - return VSetOperationNode::close(state); -} - -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/exec/vexcept_node.h b/be/src/vec/exec/vexcept_node.h deleted file mode 100644 index 7bbed571e7..0000000000 --- a/be/src/vec/exec/vexcept_node.h +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "vec/exec/vset_operation_node.h" - -namespace doris { -namespace vectorized { - -class VExceptNode : public VSetOperationNode { -public: - VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - using VSetOperationNode::get_next; - virtual Status get_next(RuntimeState* state, vectorized::Block* output_block, bool* eos); - virtual Status close(RuntimeState* state); - -private: - template <class HashTableContext, bool is_intersected> - friend struct HashTableProbe; -}; -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/exec/vintersect_node.cpp b/be/src/vec/exec/vintersect_node.cpp deleted file mode 100644 index b232708533..0000000000 --- a/be/src/vec/exec/vintersect_node.cpp +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/vintersect_node.h" - -#include "gen_cpp/PlanNodes_types.h" -#include "runtime/runtime_state.h" -#include "util/runtime_profile.h" -#include "vec/core/block.h" -#include "vec/exec/vset_operation_node.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" -namespace doris { -namespace vectorized { - -VIntersectNode::VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : VSetOperationNode(pool, tnode, descs) {} - -Status VIntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(VSetOperationNode::init(tnode, state)); - DCHECK(tnode.__isset.intersect_node); - return Status::OK(); -} - -Status VIntersectNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(VSetOperationNode::prepare(state)); - return Status::OK(); -} - -Status VIntersectNode::open(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::open"); - RETURN_IF_ERROR(VSetOperationNode::open(state)); - bool eos = false; - Status st = Status::OK(); - - for (int i = 1; i < _children.size(); ++i) { - if (i > 1) { - refresh_hash_table<true>(); - } - - _valid_element_in_hash_tbl = 0; - RETURN_IF_ERROR(child(i)->open(state)); - eos = false; - _probe_columns.resize(_child_expr_lists[i].size()); - - while (!eos) { - RETURN_IF_ERROR(process_probe_block(state, i, &eos)); - if (_probe_rows == 0) continue; - - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - HashTableProbe<HashTableCtxType, true> process_hashtable_ctx( - this, state->batch_size(), _probe_rows); - st = process_hashtable_ctx.mark_data_in_hashtable(arg); - - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, - *_hash_table_variants); - } - } - return st; -} - -Status VIntersectNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { - INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VIntersectNode::get_next"); - SCOPED_TIMER(_probe_timer); - create_mutable_cols(output_block); - Status st; - - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - HashTableProbe<HashTableCtxType, true> process_hashtable_ctx( - this, state->batch_size(), _probe_rows); - st = process_hashtable_ctx.get_data_in_hashtable(arg, _mutable_cols, - output_block, eos); - - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, - *_hash_table_variants); - - RETURN_IF_ERROR( - VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); - reached_limit(output_block, eos); - - return st; -} - -Status VIntersectNode::close(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::close"); - return VSetOperationNode::close(state); -} -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/exec/vintersect_node.h b/be/src/vec/exec/vintersect_node.h deleted file mode 100644 index 1316fcd761..0000000000 --- a/be/src/vec/exec/vintersect_node.h +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "exec/exec_node.h" -#include "vec/common/columns_hashing.h" -#include "vec/common/hash_table/hash_table.h" -#include "vec/core/materialize_block.h" -#include "vec/exec/join/join_op.h" -#include "vec/exec/join/vacquire_list.hpp" -#include "vec/exec/join/vhash_join_node.h" -#include "vec/exec/vset_operation_node.h" -#include "vec/functions/function.h" -#include "vec/utils/util.hpp" - -namespace doris { -namespace vectorized { - -class VExprContext; -class VIntersectNode : public VSetOperationNode { -public: - VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - using VSetOperationNode::get_next; - virtual Status get_next(RuntimeState* state, vectorized::Block* output_block, bool* eos); - virtual Status close(RuntimeState* state); - -private: - template <class HashTableContext, bool is_intersected> - friend struct HashTableProbe; -}; -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index d3e9c436d6..800b9aae6c 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -23,13 +23,12 @@ namespace doris { namespace vectorized { //build hash table for operation node, intersect/except node -template <class HashTableContext> +template <class HashTableContext, bool is_intersect> struct HashTableBuild { - HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode* operation_node, uint8_t offset) + HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, + VSetOperationNode<is_intersect>* operation_node, uint8_t offset) : _rows(rows), _offset(offset), - _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), _operation_node(operation_node) {} @@ -60,7 +59,6 @@ struct HashTableBuild { if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip new (&emplace_result.get_mapped()) Mapped({k, _offset}); - _operation_node->_valid_element_in_hash_tbl++; } } return Status::OK(); @@ -69,35 +67,95 @@ struct HashTableBuild { private: const int _rows; const uint8_t _offset; - Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; - VSetOperationNode* _operation_node; + VSetOperationNode<is_intersect>* _operation_node; }; -VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) +template <class HashTableContext, bool is_intersected> +struct HashTableProbe { + HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int probe_rows) + : _operation_node(operation_node), + _probe_rows(probe_rows), + _probe_raw_ptrs(operation_node->_probe_columns), + _arena(new Arena) {} + + Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) { + using KeyGetter = typename HashTableContext::State; + using Mapped = typename HashTableContext::Mapped; + + KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, nullptr); + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { + if (_probe_keys.size() < _probe_rows) { + _probe_keys.resize(_probe_rows); + } + size_t keys_size = _probe_raw_ptrs.size(); + for (size_t i = 0; i < _probe_rows; ++i) { + _probe_keys[i] = + serialize_keys_to_pool_contiguous(i, keys_size, _probe_raw_ptrs, *_arena); + } + key_getter.set_serialized_keys(_probe_keys.data()); + } + + if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) { + for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { + auto find_result = + key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena); + if (find_result.is_found()) { //if found, marked visited + auto it = find_result.get_mapped().begin(); + if (!(it->visited)) { + it->visited = true; + if constexpr (is_intersected) { //intersected + _operation_node->_valid_element_in_hash_tbl++; + } else { + _operation_node->_valid_element_in_hash_tbl--; //except + } + } + } + } + } else { + LOG(FATAL) << "Invalid RowRefListType!"; + } + return Status::OK(); + } + +private: + VSetOperationNode<is_intersected>* _operation_node; + const size_t _probe_rows; + ColumnRawPtrs& _probe_raw_ptrs; + std::unique_ptr<Arena> _arena; + std::vector<StringRef> _probe_keys; +}; + +template <bool is_intersect> +VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _valid_element_in_hash_tbl(0), _mem_used(0), - _probe_index(-1), - _probe_rows(0) { + _build_block_index(0), + _build_finished(false) { _hash_table_variants = std::make_unique<HashTableVariants>(); _arena = std::make_unique<Arena>(); } -Status VSetOperationNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::close"); +template <bool is_intersect> +void VSetOperationNode<is_intersect>::release_resource(RuntimeState* state) { for (auto& exprs : _child_expr_lists) { VExpr::close(exprs, state); } release_mem(); +} +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode<is_intersect>::close"); return ExecNode::close(state); } -Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); std::vector<std::vector<::doris::TExpr>> result_texpr_lists; @@ -119,20 +177,71 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status VSetOperationNode::open(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open"); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) { // open result expr lists. for (const std::vector<VExprContext*>& exprs : _child_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); } - RETURN_IF_ERROR(hash_table_build(state)); + _probe_finished_children_index.assign(_child_expr_lists.size(), false); + _probe_columns.resize(_child_expr_lists[1].size()); return Status::OK(); } -Status VSetOperationNode::prepare(RuntimeState* state) { +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode<is_intersect>::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + + // TODO: build the hash table in a thread to open other children asynchronously. + RETURN_IF_ERROR(hash_table_build(state)); + bool eos = false; + Status st = Status::OK(); + for (int i = 1; i < _children.size(); ++i) { + RETURN_IF_ERROR(child(i)->open(state)); + eos = false; + int probe_expr_ctxs_sz = _child_expr_lists[i].size(); + _probe_columns.resize(probe_expr_ctxs_sz); + + if constexpr (is_intersect) { + _valid_element_in_hash_tbl = 0; + } else { + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + _valid_element_in_hash_tbl = arg.hash_table.size(); + } + }, + *_hash_table_variants); + } + + while (!eos) { + release_block_memory(_probe_block, i); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(i)->get_next_after_projects(state, &_probe_block, &eos), + child(i)->get_next_span(), eos); + + RETURN_IF_ERROR(sink_probe(state, i, &_probe_block, eos)); + } + finalize_probe(state, i); + } + return st; +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::get_next(RuntimeState* state, Block* output_block, + bool* eos) { + INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next"); + SCOPED_TIMER(_probe_timer); + return pull(state, output_block, eos); +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); @@ -153,7 +262,8 @@ Status VSetOperationNode::prepare(RuntimeState* state) { return Status::OK(); } -void VSetOperationNode::hash_table_init() { +template <bool is_intersect> +void VSetOperationNode<is_intersect>::hash_table_init() { if (_child_expr_lists[0].size() == 1 && (!_build_not_ignore_null[0])) { // Single column optimization switch (_child_expr_lists[0][0]->root()->result_type()) { @@ -244,14 +354,56 @@ void VSetOperationNode::hash_table_init() { } } +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool eos) { + constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; + + if (block->rows() != 0) { + _mem_used += block->allocated_bytes(); + _mutable_block.merge(*block); + } + + if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { + _build_blocks.emplace_back(_mutable_block.to_block()); + RETURN_IF_ERROR(process_build_block(_build_blocks[_build_block_index], _build_block_index)); + _mutable_block.clear(); + ++_build_block_index; + + if (eos) { + _build_finished = true; + } + } + return Status::OK(); +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block* output_block, bool* eos) { + create_mutable_cols(output_block); + auto st = std::visit( + [&](auto&& arg) -> Status { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + return get_data_in_hashtable<HashTableCtxType>(arg, output_block, + state->batch_size(), eos); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } + }, + *_hash_table_variants); + RETURN_IF_ERROR(st); + RETURN_IF_ERROR( + VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); + reached_limit(output_block, eos); + return Status::OK(); +} + //build a hash table from child(0) -Status VSetOperationNode::hash_table_build(RuntimeState* state) { +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); Block block; MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors()); - uint8_t index = 0; - int64_t last_mem_used = 0; bool eos = false; while (!eos) { block.clear_column_data(); @@ -259,34 +411,17 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos), child(0)->get_next_span(), eos); - - size_t allocated_bytes = block.allocated_bytes(); - _mem_used += allocated_bytes; - - if (block.rows() != 0) { - mutable_block.merge(block); - } - - // make one block for each 4 gigabytes - constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE) { - _build_blocks.emplace_back(mutable_block.to_block()); - // TODO:: Rethink may we should do the proess after we recevie all build blocks ? - // which is better. - RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); - mutable_block = MutableBlock(); - ++index; - last_mem_used = _mem_used; + if (eos) { + child(0)->close(state); } + sink(state, &block, eos); } - _build_blocks.emplace_back(mutable_block.to_block()); - child(0)->close(state); - RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); return Status::OK(); } -Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_t offset) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); @@ -300,8 +435,8 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { [&](auto&& arg) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - HashTableBuild<HashTableCtxType> hash_table_build_process(rows, block, raw_ptrs, - this, offset); + HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process( + rows, raw_ptrs, this, offset); hash_table_build_process(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; @@ -312,28 +447,71 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { return Status::OK(); } -Status VSetOperationNode::process_probe_block(RuntimeState* state, int child_id, bool* eos) { - if (!_probe_column_inserted_id.empty()) { - for (int j = 0; j < _probe_column_inserted_id.size(); ++j) { - auto column_to_erase = _probe_column_inserted_id[j]; - _probe_block.erase(column_to_erase - j); - } - _probe_column_inserted_id.clear(); +template <bool is_intersect> +void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& value, + int& block_size) { + auto it = value.begin(); + for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) { + auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column; + _mutable_cols[idx->second]->insert_from(column, it->row_num); + } + block_size++; +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* /*state*/, int child_id, + Block* block, bool eos) { + CHECK(_build_finished) << "cannot sink probe data before build finished"; + if (child_id > 1) { + CHECK(_probe_finished_children_index[child_id - 1]) + << fmt::format("child with id: {} should be probed first", child_id); + } + auto probe_rows = block->rows(); + + if (probe_rows == 0) { + return Status::OK(); } - release_block_memory(_probe_block, child_id); - _probe_index = 0; - _probe_rows = 0; - - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR_AND_CHECK_SPAN( - child(child_id)->get_next_after_projects(state, &_probe_block, eos), - child(child_id)->get_next_span(), *eos); - _probe_rows = _probe_block.rows(); - RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns, child_id)); + + RETURN_IF_ERROR(extract_probe_column(*block, _probe_columns, child_id)); + + return std::visit( + [&](auto&& arg) -> Status { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + HashTableProbe<HashTableCtxType, is_intersect> process_hashtable_ctx( + this, probe_rows); + return process_hashtable_ctx.mark_data_in_hashtable(arg); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } + }, + *_hash_table_variants); +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::finalize_probe(RuntimeState* /*state*/, int child_id) { + if (child_id != (_children.size() - 1)) { + refresh_hash_table(); + _probe_columns.resize(_child_expr_lists[child_id + 1].size()); + } else { + _can_read = true; + } + _probe_finished_children_index[child_id] = true; return Status::OK(); } -Status VSetOperationNode::extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs) { +template <bool is_intersect> +bool VSetOperationNode<is_intersect>::is_child_finished(int child_id) const { + if (child_id == 0) { + return _build_finished; + } + + return _probe_finished_children_index[child_id]; +} + +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::extract_build_column(Block& block, + ColumnRawPtrs& raw_ptrs) { for (size_t i = 0; i < _child_expr_lists[0].size(); ++i) { int result_col_id = -1; RETURN_IF_ERROR(_child_expr_lists[0][i]->execute(&block, &result_col_id)); @@ -358,9 +536,10 @@ Status VSetOperationNode::extract_build_column(Block& block, ColumnRawPtrs& raw_ return Status::OK(); } -Status VSetOperationNode::extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, - int child_id) { - if (_probe_rows == 0) { +template <bool is_intersect> +Status VSetOperationNode<is_intersect>::extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, + int child_id) { + if (block.rows() == 0) { return Status::OK(); } @@ -395,7 +574,8 @@ Status VSetOperationNode::extract_probe_column(Block& block, ColumnRawPtrs& raw_ return Status::OK(); } -void VSetOperationNode::create_mutable_cols(Block* output_block) { +template <bool is_intersect> +void VSetOperationNode<is_intersect>::create_mutable_cols(Block* output_block) { _mutable_cols.resize(_left_table_data_types.size()); bool mem_reuse = output_block->mem_reuse(); @@ -408,7 +588,9 @@ void VSetOperationNode::create_mutable_cols(Block* output_block) { } } -void VSetOperationNode::debug_string(int indentation_level, std::stringstream* out) const { +template <bool is_intersect> +void VSetOperationNode<is_intersect>::debug_string(int indentation_level, + std::stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << " _child_expr_lists=["; for (int i = 0; i < _child_expr_lists.size(); ++i) { @@ -419,7 +601,8 @@ void VSetOperationNode::debug_string(int indentation_level, std::stringstream* o *out << ")" << std::endl; } -void VSetOperationNode::release_mem() { +template <bool is_intersect> +void VSetOperationNode<is_intersect>::release_mem() { _hash_table_variants = nullptr; _arena = nullptr; @@ -429,5 +612,113 @@ void VSetOperationNode::release_mem() { _probe_block.clear(); } +template <bool is_intersect> +void VSetOperationNode<is_intersect>::refresh_hash_table() { + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + if constexpr (std::is_same_v<typename HashTableCtxType::Mapped, + RowRefListWithFlags>) { + HashTableCtxType tmp_hash_table; + bool is_need_shrink = + arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl); + if (is_need_shrink) { + tmp_hash_table.hash_table.init_buf_size( + _valid_element_in_hash_tbl / arg.hash_table.get_factor() + 1); + } + + arg.init_once(); + auto& iter = arg.iter; + auto iter_end = arg.hash_table.end(); + while (iter != iter_end) { + auto& mapped = iter->get_second(); + auto it = mapped.begin(); + + if constexpr (is_intersect) { //intersected + if (it->visited) { + it->visited = false; + if (is_need_shrink) { + tmp_hash_table.hash_table.insert(iter->get_value()); + } + ++iter; + } else { + if (!is_need_shrink) { + arg.hash_table.delete_zero_key(iter->get_first()); + // the ++iter would check if the current key is zero. if it does, the iterator will be moved to the container's head. + // so we do ++iter before set_zero to make the iterator move to next valid key correctly. + auto iter_prev = iter; + ++iter; + iter_prev->set_zero(); + } else { + ++iter; + } + } + } else { //except + if (!it->visited && is_need_shrink) { + tmp_hash_table.hash_table.insert(iter->get_value()); + } + ++iter; + } + } + + arg.inited = false; + if (is_need_shrink) { + arg.hash_table = std::move(tmp_hash_table.hash_table); + } + } else { + LOG(FATAL) << "FATAL: Invalid RowRefList"; + } + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + } + }, + *_hash_table_variants); +} + +template <bool is_intersected> +template <typename HashTableContext> +Status VSetOperationNode<is_intersected>::get_data_in_hashtable(HashTableContext& hash_table_ctx, + Block* output_block, + const int batch_size, bool* eos) { + hash_table_ctx.init_once(); + int left_col_len = _left_table_data_types.size(); + auto& iter = hash_table_ctx.iter; + auto block_size = 0; + + if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) { + for (; iter != hash_table_ctx.hash_table.end() && block_size < batch_size; ++iter) { + auto& value = iter->get_second(); + auto it = value.begin(); + if constexpr (is_intersected) { + if (it->visited) { //intersected: have done probe, so visited values it's the result + add_result_columns(value, block_size); + } + } else { + if (!it->visited) { //except: haven't visited values it's the needed result + add_result_columns(value, block_size); + } + } + } + } else { + LOG(FATAL) << "Invalid RowRefListType!"; + } + + *eos = iter == hash_table_ctx.hash_table.end(); + if (!output_block->mem_reuse()) { + for (int i = 0; i < left_col_len; ++i) { + output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]), + _left_table_data_types[i], "")); + } + } else { + _mutable_cols.clear(); + } + + return Status::OK(); +} + +template class VSetOperationNode<true>; +template class VSetOperationNode<false>; + } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index b93464ba87..1e8b5e1ebe 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -31,20 +31,35 @@ namespace doris { namespace vectorized { -class VSetOperationNode : public ExecNode { +template <bool is_intersect> +class VSetOperationNode final : public ExecNode { public: VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - return Status::NotSupported("Not Implemented get RowBatch in vecorized execution."); + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status get_next(RuntimeState* /*state*/, RowBatch* /*row_batch*/, bool* /*eos*/) override { + return Status::NotSupported("Not implemented get RowBatch in vectorized execution."); } - virtual Status close(RuntimeState* state); - virtual void debug_string(int indentation_level, std::stringstream* out) const; -protected: + Status get_next(RuntimeState* state, Block* output_block, bool* eos) override; + + Status close(RuntimeState* state) override; + void debug_string(int indentation_level, std::stringstream* out) const override; + + Status alloc_resource(RuntimeState* state) override; + void release_resource(RuntimeState* state) override; + + Status sink(RuntimeState* state, Block* block, bool eos) override; + Status pull(RuntimeState* state, Block* output_block, bool* eos) override; + + Status sink_probe(RuntimeState* state, int child_id, Block* block, bool eos); + Status finalize_probe(RuntimeState* state, int child_id); + + bool is_child_finished(int child_id) const; + +private: //Todo: In build process of hashtable, It's same as join node. //It's time to abstract out the same methods and provide them directly to others; void hash_table_init(); @@ -52,13 +67,17 @@ protected: Status process_build_block(Block& block, uint8_t offset); Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs); Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id); - template <bool keep_matched> void refresh_hash_table(); - Status process_probe_block(RuntimeState* state, int child_id, bool* eos); + + template <typename HashTableContext> + Status get_data_in_hashtable(HashTableContext& hash_table_ctx, Block* output_block, + const int batch_size, bool* eos); + + void add_result_columns(RowRefListWithFlags& value, int& block_size); + void create_mutable_cols(Block* output_block); void release_mem(); -protected: std::unique_ptr<HashTableVariants> _hash_table_variants; std::vector<size_t> _probe_key_sz; @@ -85,207 +104,21 @@ protected: Block _probe_block; ColumnRawPtrs _probe_columns; std::vector<MutableColumnPtr> _mutable_cols; - int _probe_index; - size_t _probe_rows; + int _build_block_index; + bool _build_finished; + std::vector<bool> _probe_finished_children_index; + MutableBlock _mutable_block; RuntimeProfile::Counter* _build_timer; // time to build hash table RuntimeProfile::Counter* _probe_timer; // time to probe - template <class HashTableContext> + template <class HashTableContext, bool is_intersected> friend struct HashTableBuild; template <class HashTableContext, bool is_intersected> friend struct HashTableProbe; }; -template <bool keep_matched> -void VSetOperationNode::refresh_hash_table() { - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - if constexpr (std::is_same_v<typename HashTableCtxType::Mapped, - RowRefListWithFlags>) { - HashTableCtxType tmp_hash_table; - bool is_need_shrink = - arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl); - if (is_need_shrink) { - tmp_hash_table.hash_table.init_buf_size( - _valid_element_in_hash_tbl / arg.hash_table.get_factor() + 1); - } - - arg.init_once(); - auto& iter = arg.iter; - auto iter_end = arg.hash_table.end(); - while (iter != iter_end) { - auto& mapped = iter->get_second(); - auto it = mapped.begin(); - - if constexpr (keep_matched) { //intersected - if (it->visited) { - it->visited = false; - if (is_need_shrink) { - tmp_hash_table.hash_table.insert(iter->get_value()); - } - ++iter; - } else { - if (!is_need_shrink) { - arg.hash_table.delete_zero_key(iter->get_first()); - // the ++iter would check if the current key is zero. if it does, the iterator will be moved to the container's head. - // so we do ++iter before set_zero to make the iterator move to next valid key correctly. - auto iter_prev = iter; - ++iter; - iter_prev->set_zero(); - } else { - ++iter; - } - } - } else { //except - if (!it->visited && is_need_shrink) { - tmp_hash_table.hash_table.insert(iter->get_value()); - } - ++iter; - } - } - - arg.inited = false; - if (is_need_shrink) { - arg.hash_table = std::move(tmp_hash_table.hash_table); - } - } else { - LOG(FATAL) << "FATAL: Invalid RowRefList"; - } - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - } - }, - *_hash_table_variants); -} - -template <class HashTableContext, bool is_intersected> -struct HashTableProbe { - HashTableProbe(VSetOperationNode* operation_node, int batch_size, int probe_rows) - : _operation_node(operation_node), - _left_table_data_types(operation_node->_left_table_data_types), - _batch_size(batch_size), - _probe_rows(probe_rows), - _build_blocks(operation_node->_build_blocks), - _probe_block(operation_node->_probe_block), - _probe_index(operation_node->_probe_index), - _num_rows_returned(operation_node->_num_rows_returned), - _probe_raw_ptrs(operation_node->_probe_columns), - _rows_returned_counter(operation_node->_rows_returned_counter), - _build_col_idx(operation_node->_build_col_idx), - _mutable_cols(operation_node->_mutable_cols) {} - - Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) { - using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; - - KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, nullptr); - - if (_probe_index == 0) { - _arena.reset(new Arena()); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { - if (_probe_keys.size() < _probe_rows) { - _probe_keys.resize(_probe_rows); - } - size_t keys_size = _probe_raw_ptrs.size(); - for (size_t i = 0; i < _probe_rows; ++i) { - _probe_keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, - _probe_raw_ptrs, *_arena); - } - } - } - - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { - key_getter.set_serialized_keys(_probe_keys.data()); - } - - if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) { - for (; _probe_index < _probe_rows;) { - auto find_result = - key_getter.find_key(hash_table_ctx.hash_table, _probe_index, *_arena); - if (find_result.is_found()) { //if found, marked visited - auto it = find_result.get_mapped().begin(); - if (!(it->visited)) { - it->visited = true; - if constexpr (is_intersected) //intersected - _operation_node->_valid_element_in_hash_tbl++; - else - _operation_node->_valid_element_in_hash_tbl--; //except - } - } - _probe_index++; - } - } else { - LOG(FATAL) << "Invalid RowRefListType!"; - } - return Status::OK(); - } - - void add_result_columns(RowRefListWithFlags& value, int& block_size) { - auto it = value.begin(); - for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) { - auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column; - _mutable_cols[idx->second]->insert_from(column, it->row_num); - } - block_size++; - } - - Status get_data_in_hashtable(HashTableContext& hash_table_ctx, - std::vector<MutableColumnPtr>& mutable_cols, Block* output_block, - bool* eos) { - hash_table_ctx.init_once(); - int left_col_len = _left_table_data_types.size(); - auto& iter = hash_table_ctx.iter; - auto block_size = 0; - - if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) { - for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) { - auto& value = iter->get_second(); - auto it = value.begin(); - if constexpr (is_intersected) { - if (it->visited) { //intersected: have done probe, so visited values it's the result - add_result_columns(value, block_size); - } - } else { - if (!it->visited) { //except: haven't visited values it's the needed result - add_result_columns(value, block_size); - } - } - } - } else { - LOG(FATAL) << "Invalid RowRefListType!"; - } - - *eos = iter == hash_table_ctx.hash_table.end(); - if (!output_block->mem_reuse()) { - for (int i = 0; i < left_col_len; ++i) { - output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]), - _left_table_data_types[i], "")); - } - } else { - _mutable_cols.clear(); - } - - return Status::OK(); - } - -private: - VSetOperationNode* _operation_node; - const DataTypes& _left_table_data_types; - const int _batch_size; - const size_t _probe_rows; - const std::vector<Block>& _build_blocks; - const Block& _probe_block; - int& _probe_index; - int64_t& _num_rows_returned; - ColumnRawPtrs& _probe_raw_ptrs; - std::unique_ptr<Arena> _arena; - std::vector<StringRef> _probe_keys; - RuntimeProfile::Counter* _rows_returned_counter; - std::unordered_map<int, int>& _build_col_idx; - std::vector<MutableColumnPtr>& _mutable_cols; -}; +using VIntersectNode = VSetOperationNode<true>; +using VExceptNode = VSetOperationNode<false>; } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org