This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch vectorized in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8760e644f4086809f214f0d105f0ae4492024ca6 Author: anneji-dev <85534151+anneji-...@users.noreply.github.com> AuthorDate: Mon Jan 17 14:09:15 2022 +0800 [Vectorized][feature](planner)(executor) Support grouping sets rollup cube (#7601) --- be/src/exec/exec_node.cpp | 8 +- be/src/exec/repeat_node.h | 2 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/vrepeat_node.cpp | 245 +++++++++++++++++++++ be/src/vec/exec/vrepeat_node.h | 56 +++++ be/src/vec/functions/function_grouping.cpp | 25 +++ be/src/vec/functions/function_grouping.h | 90 ++++++++ be/src/vec/functions/simple_function_factory.h | 2 + .../apache/doris/planner/SingleNodePlanner.java | 10 + gensrc/script/doris_builtins_functions.py | 4 +- 10 files changed, 440 insertions(+), 4 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4582f89..97c3259 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -82,6 +82,7 @@ #include "vec/exprs/vexpr.h" #include "vec/exec/vempty_set_node.h" #include "vec/exec/vschema_scan_node.h" +#include "vec/exec/vrepeat_node.h" namespace doris { const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"; @@ -389,6 +390,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::SCHEMA_SCAN_NODE: case TPlanNodeType::ANALYTIC_EVAL_NODE: case TPlanNodeType::SELECT_NODE: + case TPlanNodeType::REPEAT_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -568,7 +570,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::REPEAT_NODE: - *node = pool->add(new RepeatNode(pool, tnode, descs)); + if (state->enable_vectorized_exec()) { + *node = pool->add(new vectorized::VRepeatNode(pool, tnode, descs)); + } else { + *node = pool->add(new RepeatNode(pool, tnode, descs)); + } return Status::OK(); case TPlanNodeType::ASSERT_NUM_ROWS_NODE: diff --git a/be/src/exec/repeat_node.h b/be/src/exec/repeat_node.h index 01335d2..d9dce75 100644 --- a/be/src/exec/repeat_node.h +++ b/be/src/exec/repeat_node.h @@ -40,7 +40,7 @@ public: protected: virtual void debug_string(int indentation_level, std::stringstream* out) const override; -private: +protected: Status get_repeated_batch(RowBatch* child_row_batch, int repeat_id_idx, RowBatch* row_batch); // Slot id set used to indicate those slots need to set to null. diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 9c4d947..f737391 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -86,6 +86,7 @@ set(VEC_FILES exec/vempty_set_node.cpp exec/vanalytic_eval_node.cpp exec/vassert_num_rows_node.cpp + exec/vrepeat_node.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp @@ -139,6 +140,7 @@ set(VEC_FILES functions/function_date_or_datetime_computation.cpp functions/function_date_or_datetime_to_string.cpp functions/function_datetime_string_to_string.cpp + functions/function_grouping.cpp olap/vgeneric_iterators.cpp olap/vcollect_iterator.cpp olap/block_reader.cpp diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp new file mode 100644 index 0000000..dd8bb28 --- /dev/null +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vrepeat_node.h" +#include "exprs/expr.h" +#include "gutil/strings/join.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" + +namespace doris::vectorized { +VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : RepeatNode(pool, tnode, descs), _child_block(nullptr), _virtual_tuple_id(tnode.repeat_node.output_tuple_id) {} + +Status VRepeatNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VRepeatNode::prepare"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(RepeatNode::prepare(state)); + + // get current all output slots + for (const auto& tuple_desc : this->row_desc().tuple_descriptors()) { + for (const auto& slot_desc : tuple_desc->slots()) { + _output_slots.push_back(slot_desc); + } + } + + // get all input slots + for (const auto& child_tuple_desc : child(0)->row_desc().tuple_descriptors()) { + for (const auto& child_slot_desc : child_tuple_desc->slots()) { + _child_slots.push_back(child_slot_desc); + } + } + + _virtual_tuple_desc = state->desc_tbl().get_tuple_descriptor(_virtual_tuple_id); + if (_virtual_tuple_desc == NULL) { + return Status::InternalError("Failed to get virtual tuple descriptor."); + } + + std::stringstream ss; + ss << "The output slots size " << _output_slots.size() + << " is not equal to the sum of child_slots_size " << _child_slots.size() + << ",virtual_slots_size " << _virtual_tuple_desc->slots().size(); + if (_output_slots.size() != (_child_slots.size() + _virtual_tuple_desc->slots().size())) { + return Status::InternalError(ss.str()); + } + + _child_block.reset(new Block()); + + return Status::OK(); +} + +Status VRepeatNode::open(RuntimeState* state) { + VLOG_CRITICAL << "VRepeatNode::open"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(RepeatNode::open(state)); + return Status::OK(); +} + +Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block) { + VLOG_CRITICAL << "VRepeatNode::get_repeated_block"; + DCHECK(child_block != nullptr); + DCHECK_EQ(output_block->rows(), 0); + + size_t child_column_size = child_block->columns(); + size_t column_size = _output_slots.size(); + bool mem_reuse = output_block->mem_reuse(); + DCHECK_EQ(child_column_size, _child_slots.size()); + DCHECK_LT(child_column_size, column_size); + std::vector<vectorized::MutableColumnPtr> columns(column_size); + for (size_t i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*output_block->get_by_position(i).column).mutate(); + } else { + columns[i] = _output_slots[i]->get_empty_mutable_column(); + } + } + + /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2)); + * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1); + * slot_id_set_list=[[0],[1]],repeat_id_idx=0, + * child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1 + * output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1 + */ + size_t cur_col = 0; + for (size_t i = 0; i < child_column_size; i++) { + const ColumnWithTypeAndName& src_column = child_block->get_by_position(i); + + DCHECK_EQ(_child_slots[i]->type().type, _output_slots[cur_col]->type().type); + DCHECK_EQ(_child_slots[i]->col_name(), _output_slots[cur_col]->col_name()); + + std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx]; + bool is_repeat_slot = _all_slot_ids.find(_output_slots[cur_col]->id()) != _all_slot_ids.end(); + bool is_set_null_slot = repeat_ids.find(_output_slots[cur_col]->id()) == repeat_ids.end(); + + if (is_repeat_slot) { + DCHECK(_output_slots[cur_col]->is_nullable()); + auto* nullable_column = reinterpret_cast<ColumnNullable *>(columns[cur_col].get()); + auto& null_map = nullable_column->get_null_map_data(); + auto* column_ptr = columns[cur_col].get(); + + // set slot null not in repeat_ids + if (is_set_null_slot) { + nullable_column->resize(src_column.column->size()); + for (size_t j = 0; j < src_column.column->size(); ++j) { + nullable_column->insert_data(nullptr, 0); + } + } else { + if (!src_column.type->is_nullable()) { + for (size_t j = 0; j < src_column.column->size(); ++j) { + null_map.push_back(0); + } + column_ptr = &nullable_column->get_nested_column(); + } + column_ptr->insert_range_from(*src_column.column, 0, src_column.column->size()); + } + } else { + columns[cur_col]->insert_range_from(*src_column.column, 0, src_column.column->size()); + } + cur_col++; + } + + // Fill grouping ID to tuple + for (auto slot_idx = 0; slot_idx < _grouping_list.size(); slot_idx++) { + DCHECK_LT(slot_idx, _virtual_tuple_desc->slots().size()); + const SlotDescriptor* _virtual_slot_desc = _virtual_tuple_desc->slots()[slot_idx]; + DCHECK_EQ(_virtual_slot_desc->type().type, _output_slots[cur_col]->type().type); + DCHECK_EQ(_virtual_slot_desc->col_name(), _output_slots[cur_col]->col_name()); + int64_t val = _grouping_list[slot_idx][repeat_id_idx]; + auto* column_ptr = columns[cur_col].get(); + if (_output_slots[cur_col]->is_nullable()) { + auto* nullable_column = reinterpret_cast<ColumnNullable *>(columns[cur_col].get()); + auto& null_map = nullable_column->get_null_map_data(); + column_ptr = &nullable_column->get_nested_column(); + + for (size_t i = 0; i < child_block->rows(); ++i) { + null_map.push_back(0); + } + } + + auto* col = assert_cast<ColumnVector<Int64> *>(column_ptr); + for (size_t i = 0; i < child_block->rows(); ++i) { + col->insert_value(val); + } + cur_col ++; + } + + DCHECK_EQ(cur_col, column_size); + + if (!columns.empty() && !columns[0]->empty()) { + auto n_columns = 0; + if (!mem_reuse) { + for (const auto slot_desc : _output_slots) { + output_block->insert( + ColumnWithTypeAndName(std::move(columns[n_columns++]), slot_desc->get_data_type_ptr(), slot_desc->col_name())); + } + } else { + columns.clear(); + } + } + return Status::OK(); +} + +Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { + VLOG_CRITICAL << "VRepeatNode::get_next"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + if (state == NULL || block == NULL || eos == NULL) + return Status::InternalError("input is NULL pointer"); + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + DCHECK(_repeat_id_idx >= 0); + for (const std::vector<int64_t>& v : _grouping_list) { + DCHECK(_repeat_id_idx <= (int)v.size()); + } + DCHECK(block->rows() == 0); + + // current child block has finished its repeat, get child's next block + if (_child_block->rows() == 0) { + if (_child_eos) { + *eos = true; + return Status::OK(); + } + + RETURN_IF_ERROR(child(0)->get_next(state, _child_block.get(), &_child_eos)); + + if (_child_block->rows() == 0) { + *eos = true; + return Status::OK(); + } + } + + RETURN_IF_ERROR(get_repeated_block(_child_block.get(), _repeat_id_idx, block)); + + _repeat_id_idx++; + + int size = _repeat_id_list.size(); + if (_repeat_id_idx >= size) { + release_block_memory(*_child_block.get()); + _repeat_id_idx = 0; + } + + _num_rows_returned += block->rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + VLOG_ROW << "VRepeatNode output rows: " << block->rows(); + return Status::OK(); +} + +Status VRepeatNode::close(RuntimeState* state) { + VLOG_CRITICAL << "VRepeatNode::close"; + if (is_closed()) { + return Status::OK(); + } + release_block_memory(*_child_block.get()); + RETURN_IF_ERROR(child(0)->close(state)); + return ExecNode::close(state); +} + +void VRepeatNode::debug_string(int indentation_level, std::stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "VRepeatNode("; + *out << "repeat pattern: [" << JoinElements(_repeat_id_list, ",") << "]\n"; + *out << "add " << _grouping_list.size() << " columns. \n"; + *out << "added column values: "; + for (const std::vector<int64_t>& v : _grouping_list) { + *out << "[" << JoinElements(v, ",") << "] "; + } + *out << "\n"; + ExecNode::debug_string(indentation_level, out); + *out << ")"; +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h new file mode 100644 index 0000000..cccf7ad --- /dev/null +++ b/be/src/vec/exec/vrepeat_node.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/repeat_node.h" + +namespace doris { + +class ObjectPool; +class TPlanNode; +class DescriptorTbl; +class RuntimeState; +class Status; + +namespace vectorized { +class VRepeatNode : public RepeatNode { +public: + VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~VRepeatNode() override = default; + + virtual Status prepare(RuntimeState* state) override; + virtual Status open(RuntimeState* state) override; + virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; + virtual Status close(RuntimeState* state) override; + +protected: + virtual void debug_string(int indentation_level, std::stringstream* out) const override; + +private: + Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block); + + std::unique_ptr<Block> _child_block; + std::vector<SlotDescriptor*> _child_slots; + std::vector<SlotDescriptor*> _output_slots; + + // _virtual_tuple_id id used for GROUPING_ID(). + TupleId _virtual_tuple_id; + const TupleDescriptor* _virtual_tuple_desc; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/functions/function_grouping.cpp b/be/src/vec/functions/function_grouping.cpp new file mode 100644 index 0000000..763dec2 --- /dev/null +++ b/be/src/vec/functions/function_grouping.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "function_grouping.h" + +namespace doris::vectorized { +void register_function_grouping(SimpleFunctionFactory& factory) { + factory.register_function<FunctionGrouping>(); + factory.register_function<FunctionGroupingId>(); +} +} diff --git a/be/src/vec/functions/function_grouping.h b/be/src/vec/functions/function_grouping.h new file mode 100644 index 0000000..6bdaa7e --- /dev/null +++ b/be/src/vec/functions/function_grouping.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_FUNCTION_GROUPING_H +#define DORIS_FUNCTION_GROUPING_H + +#include "vec/functions/simple_function_factory.h" +#include "vec/columns/column_nullable.h" +#include "vec/functions/function_helpers.h" +#include "vec/utils/util.hpp" +#include "vec/data_types/get_least_supertype.h" + +namespace doris::vectorized { + +class FunctionGroupingBase : public IFunction { +public: + size_t get_number_of_arguments() const override { return 1; } + + bool use_default_implementation_for_constants() const override { return false; } + + bool use_default_implementation_for_nulls() const override { return false; } + + DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) const override { + return std::make_shared<DataTypeInt64>(); + } +}; + +class FunctionGrouping : public FunctionGroupingBase { +public: + static constexpr auto name = "grouping"; + + static FunctionPtr create() { return std::make_shared<FunctionGrouping>(); } + + String get_name() const override { return name; } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) override { + const ColumnWithTypeAndName& src_column = block.get_by_position(arguments[0]); + const ColumnWithTypeAndName& rel_column = block.get_by_position(result); + if (!src_column.column) + return Status::InternalError("Illegal column " + src_column.column->get_name() + " of first argument of function " + name); + + DCHECK(src_column.type->is_nullable() == true); + MutableColumnPtr res_column = rel_column.type->create_column(); + auto* src_nullable_column = reinterpret_cast<ColumnNullable *>(const_cast<IColumn *>(src_column.column.get())); + res_column->insert_range_from(*src_nullable_column->get_nested_column_ptr().get(), 0, src_column.column->size()); + block.get_by_position(result).column = std::move(res_column); + return Status::OK(); + } +}; + +class FunctionGroupingId : public FunctionGroupingBase { +public: + static constexpr auto name = "grouping_id"; + + static FunctionPtr create() { return std::make_shared<FunctionGroupingId>(); } + + String get_name() const override { return name; } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) override { + const ColumnWithTypeAndName& src_column = block.get_by_position(arguments[0]); + const ColumnWithTypeAndName& rel_column = block.get_by_position(result); + if (!src_column.column) + return Status::InternalError("Illegal column " + src_column.column->get_name() + " of first argument of function " + name); + + DCHECK(src_column.type->is_nullable() == true); + MutableColumnPtr res_column = rel_column.type->create_column(); + auto* src_nullable_column = reinterpret_cast<ColumnNullable *>(const_cast<IColumn *>(src_column.column.get())); + res_column->insert_range_from(*src_nullable_column->get_nested_column_ptr().get(), 0, src_column.column->size()); + block.get_by_position(result).column = std::move(res_column); + return Status::OK(); + } +}; +} +#endif //DORIS_FUNCTION_GROUPING_H diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 0718930..45420cb 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -66,6 +66,7 @@ void register_function_like(SimpleFunctionFactory& factory); void register_function_regexp(SimpleFunctionFactory& factory); void register_function_random(SimpleFunctionFactory& factory); void register_function_coalesce(SimpleFunctionFactory& factory); ++void register_function_grouping(SimpleFunctionFactory& factory); class SimpleFunctionFactory { using Creator = std::function<FunctionBuilderPtr()>; @@ -181,6 +182,7 @@ public: register_function_regexp(instance); register_function_random(instance); register_function_coalesce(instance); + register_function_grouping(instance); }); return instance; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index f62f606..e76253a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1070,6 +1070,16 @@ public class SingleNodePlanner { && groupingInfo != null); root = new RepeatNode(ctx_.getNextNodeId(), root, groupingInfo, groupByClause); root.init(analyzer); + // set agg outtuple nullable + AggregateInfo aggInfo = selectStmt.getAggInfo(); + TupleId aggOutTupleId = aggInfo.getOutputTupleId(); + TupleDescriptor aggOutTupleDescriptor = analyzer.getDescTbl().getTupleDesc(aggOutTupleId); + int aggregateExprStartIndex = groupByClause.getGroupingExprs().size(); + for (int i = 0; i < aggregateExprStartIndex; ++i) { + SlotDescriptor slot = aggOutTupleDescriptor.getSlots().get(i); + if (!slot.getIsNullable()) + slot.setIsNullable(true); + } return root; } diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 8d3b6d9..ae7dc11 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1397,10 +1397,10 @@ visible_functions = [ # grouping sets functions [['grouping_id'], 'BIGINT', ['BIGINT'], '_ZN5doris21GroupingSetsFunctions11grouping_idEPN9doris_udf15FunctionContextERKNS1_9BigIntValE', - '', '', '', 'ALWAYS_NOT_NULLABLE'], + '', '', 'vec', 'ALWAYS_NOT_NULLABLE'], [['grouping'], 'BIGINT', ['BIGINT'], '_ZN5doris21GroupingSetsFunctions8groupingEPN9doris_udf15FunctionContextERKNS1_9BigIntValE', - '' ,'', '', 'ALWAYS_NOT_NULLABLE'], + '' ,'', 'vec', 'ALWAYS_NOT_NULLABLE'], ] # Except the following functions, other function will directly return --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org