This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1-v20220707 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 270daccfba38a341a0d8f758dcacbe5048a495e2 Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Thu Jul 7 11:20:16 2022 +0800 [hotfix](dev-1.0.1) bottom line solution for vec outer join (#10645) agg and hash join node should produce nullable type column correctly according to the fe planner --- be/src/vec/exec/join/vhash_join_node.cpp | 43 ++++++++++++----- be/src/vec/exec/vaggregation_node.cpp | 54 ++++++++++++++++------ be/src/vec/exec/vaggregation_node.h | 7 +-- be/src/vec/exec/vanalytic_eval_node.cpp | 11 ++++- be/src/vec/functions/function_case.h | 4 +- .../org/apache/doris/analysis/AggregateInfo.java | 12 +++++ .../org/apache/doris/planner/AggregationNode.java | 9 ++-- .../java/org/apache/doris/planner/Planner.java | 13 +++--- gensrc/thrift/PlanNodes.thrift | 4 ++ 9 files changed, 115 insertions(+), 42 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 596bef712b..25be6c80a9 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -189,10 +189,15 @@ struct ProcessHashTableProbe { if constexpr (!is_semi_anti_join || have_other_join_conjunct) { if (_build_blocks.size() == 1) { for (int i = 0; i < column_length; i++) { - auto& column = *_build_blocks[0].get_by_position(i).column; if (output_slot_flags[i]) { + auto column = _build_blocks[0].get_by_position(i).column; + if (mcol[i + column_offset]->is_nullable() xor column->is_nullable()) { + DCHECK(mcol[i + column_offset]->is_nullable() && + !column->is_nullable()); + column = make_nullable(column); + } mcol[i + column_offset]->insert_indices_from( - column, _build_block_rows.data(), _build_block_rows.data() + size); + *column, _build_block_rows.data(), _build_block_rows.data() + size); } else { mcol[i + column_offset]->resize(size); } @@ -207,17 +212,29 @@ struct ProcessHashTableProbe { assert_cast<ColumnNullable*>(mcol[i + column_offset].get()) ->insert_join_null_data(); } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, + auto column = _build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + if (mcol[i + column_offset]->is_nullable() xor + column->is_nullable()) { + DCHECK(mcol[i + column_offset]->is_nullable() && + !column->is_nullable()); + column = make_nullable(column); + } + mcol[i + column_offset]->insert_from(*column, _build_block_rows[j]); } } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + auto column = _build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + if (mcol[i + column_offset]->is_nullable() xor + column->is_nullable()) { + DCHECK(mcol[i + column_offset]->is_nullable() && + !column->is_nullable()); + column = make_nullable(column); + } + mcol[i + column_offset]->insert_from(*column, _build_block_rows[j]); } } } else { @@ -233,7 +250,11 @@ struct ProcessHashTableProbe { int size) { for (int i = 0; i < output_slot_flags.size(); ++i) { if (output_slot_flags[i]) { - auto& column = _probe_block.get_by_position(i).column; + auto column = _probe_block.get_by_position(i).column; + if (mcol[i]->is_nullable() xor column->is_nullable()) { + DCHECK(mcol[i]->is_nullable() && !column->is_nullable()); + column = make_nullable(column); + } column->replicate(&_items_counts[0], size, *mcol[i]); } else { mcol[i]->resize(size); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 29983cf374..4dd89499a1 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -77,6 +77,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), + _aggregate_evaluators_changed_flags(tnode.agg_node.aggregate_function_changed_flags), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(NULL), _output_tuple_id(tnode.agg_node.output_tuple_id), @@ -225,19 +226,26 @@ Status AggregationNode::prepare(RuntimeState* state) { int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { - auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable(); + auto nullable_output = _needs_finalize ? _output_tuple_desc->slots()[i]->is_nullable() : _intermediate_tuple_desc->slots()[i]->is_nullable(); auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); if (nullable_output != nullable_input) { DCHECK(nullable_output); - _make_nullable_keys.emplace_back(i); + _make_nullable_output_column_pos.emplace_back(i); } } + int probe_expr_count = _probe_expr_ctxs.size(); for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(), _mem_pool.get(), intermediate_slot_desc, output_slot_desc, mem_tracker())); + auto nullable_output = _needs_finalize ? output_slot_desc->is_nullable() : intermediate_slot_desc->is_nullable(); + auto nullable_agg_output = _aggregate_evaluators[i]->data_type()->is_nullable(); + if ( nullable_output != nullable_agg_output) { + DCHECK(nullable_output); + _make_nullable_output_column_pos.emplace_back(i + probe_expr_count); + } } // set profile timer to evaluators @@ -389,11 +397,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { } // pre stream agg need use _num_row_return to decide whether to do pre stream agg _num_rows_returned += block->rows(); - _make_nullable_output_key(block); + _make_nullable_output_column(block); COUNTER_SET(_rows_returned_counter, _num_rows_returned); } else { RETURN_IF_ERROR(_executor.get_result(state, block, eos)); - _make_nullable_output_key(block); + _make_nullable_output_column(block); // dispose the having clause, should not be execute in prestreaming agg RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); reached_limit(block, eos); @@ -497,6 +505,9 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block } for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators_changed_flags[i]) { + write_binary(true, value_buffer_writers[i]); + } _aggregate_evaluators[i]->function()->serialize( _agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]); value_buffer_writers[i].commit(); @@ -576,13 +587,16 @@ void AggregationNode::_close_without_key() { release_tracker(); } -void AggregationNode::_make_nullable_output_key(Block* block) { +void AggregationNode::_make_nullable_output_column(Block* block) { if (block->rows() != 0) { - for (auto cid : _make_nullable_keys) { - block->get_by_position(cid).column = - make_nullable(block->get_by_position(cid).column); - block->get_by_position(cid).type = - make_nullable(block->get_by_position(cid).type); + for (auto cid : _make_nullable_output_column_pos) { + if (!block->get_by_position(cid).column->is_nullable()) { + block->get_by_position(cid).column = + make_nullable(block->get_by_position(cid).column); + } + if (!block->get_by_position(cid).type->is_nullable()) { + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } } } } @@ -695,7 +709,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i // will serialize value data to string column std::vector<VectorBufferWriter> value_buffer_writers; - bool mem_reuse = out_block->mem_reuse(); + bool mem_reuse = out_block->mem_reuse() && _make_nullable_output_column_pos.empty(); auto serialize_string_type = std::make_shared<DataTypeString>(); MutableColumns value_columns; for (int i = 0; i < _aggregate_evaluators.size(); ++i) { @@ -713,6 +727,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i for (size_t j = 0; j < rows; ++j) { for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators_changed_flags[i]) { + write_binary(true, value_buffer_writers[i]); + } _aggregate_evaluators[i]->function()->serialize( _streaming_pre_places[j] + _offsets_of_aggregate_states[i], value_buffer_writers[i]); @@ -850,14 +867,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos) { - bool mem_reuse = block->mem_reuse(); + bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty(); auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc()); int key_size = _probe_expr_ctxs.size(); MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { if (!mem_reuse) { - key_columns.emplace_back(column_withschema[i].type->create_column()); + key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column()); } else { key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } @@ -865,7 +882,8 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo MutableColumns value_columns; for (int i = key_size; i < column_withschema.size(); ++i) { if (!mem_reuse) { - value_columns.emplace_back(column_withschema[i].type->create_column()); + value_columns.emplace_back( + _aggregate_evaluators[i - key_size]->data_type()->create_column()); } else { value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } @@ -932,7 +950,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat MutableColumns value_columns(agg_size); DataTypes value_data_types(agg_size); - bool mem_reuse = block->mem_reuse(); + bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty(); MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { @@ -969,6 +987,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat // serialize values for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators_changed_flags[i]) { + write_binary(true, value_buffer_writers[i]); + } _aggregate_evaluators[i]->function()->serialize( mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]); value_buffer_writers[i].commit(); @@ -984,6 +1005,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.data.get_null_key_data(); for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators_changed_flags[i]) { + write_binary(true, value_buffer_writers[i]); + } _aggregate_evaluators[i]->function()->serialize( mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index c6d6c34b1c..758d11fbdb 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -417,12 +417,13 @@ public: private: // group by k1,k2 std::vector<VExprContext*> _probe_expr_ctxs; - // left / full join will change the key nullable make output/input solt + // left / full join will change the output nullable make output/input solt // nullable diff. so we need make nullable of it. - std::vector<size_t> _make_nullable_keys; + std::vector<size_t> _make_nullable_output_column_pos; std::vector<size_t> _probe_key_sz; std::vector<AggFnEvaluator*> _aggregate_evaluators; + std::vector<bool> _aggregate_evaluators_changed_flags; // may be we don't have to know the tuple id TupleId _intermediate_tuple_id; @@ -462,7 +463,7 @@ private: /// the preagg should pass through any rows it can't fit in its tables. bool _should_expand_preagg_hash_tables(); - void _make_nullable_output_key(Block* block); + void _make_nullable_output_column(Block* block); Status _create_agg_status(AggregateDataPtr data); Status _destroy_agg_status(AggregateDataPtr data); diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 65765d15d3..858e330efe 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) { } for (size_t i = 0; i < _result_window_columns.size(); ++i) { - block->insert({std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""}); + SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; + if (output_slot_desc->is_nullable() xor _agg_functions[i]->data_type()->is_nullable()) { + DCHECK(output_slot_desc->is_nullable() && + !_agg_functions[i]->data_type()->is_nullable()); + block->insert({make_nullable(std::move(_result_window_columns[i])), + make_nullable(_agg_functions[i]->data_type()), ""}); + } else { + block->insert( + {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""}); + } } _output_block_index++; diff --git a/be/src/vec/functions/function_case.h b/be/src/vec/functions/function_case.h index 47e33f58ff..0b0772583d 100644 --- a/be/src/vec/functions/function_case.h +++ b/be/src/vec/functions/function_case.h @@ -172,9 +172,11 @@ public: .data(); // simd automatically + // we have to use (bool)cond_raw_data[row_idx] to force the output is 0 or 1 + // because in some cases, we might use none-zero values 1 or 2 to indicate the value is null. for (int row_idx = 0; row_idx < rows_count; row_idx++) { then_idx_ptr[row_idx] |= - (!then_idx_ptr[row_idx]) * cond_raw_data[row_idx] * i; + (!then_idx_ptr[row_idx]) * (bool)cond_raw_data[row_idx] * i; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index 74facd0a55..b599f36909 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -333,6 +333,18 @@ public final class AggregateInfo extends AggregateInfoBase { return result; } + public ArrayList<Boolean> getMaterializedAggregateExprChangedFlags() { + ArrayList<Boolean> result = Lists.newArrayList(); + for (Integer i : materializedSlots_) { + if (mergeAggInfo_ != null) { + result.add(aggregateExprs_.get(i).isNullable() != mergeAggInfo_.aggregateExprs_.get(i).isNullable()); + } else { + result.add(false); + } + } + return result; + } + public AggregateInfo getMergeAggInfo() { return mergeAggInfo_; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index e7dee2651c..c19058f1cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -268,16 +268,15 @@ public class AggregationNode extends PlanNode { msg.node_type = TPlanNodeType.AGGREGATION_NODE; List<TExpr> aggregateFunctions = Lists.newArrayList(); // only serialize agg exprs that are being materialized - for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) { + for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) { aggregateFunctions.add(e.treeToThrift()); } msg.agg_node = - new TAggregationNode( - aggregateFunctions, - aggInfo.getIntermediateTupleId().asInt(), - aggInfo.getOutputTupleId().asInt(), needsFinalize); + new TAggregationNode(aggregateFunctions, aggInfo.getIntermediateTupleId().asInt(), + aggInfo.getOutputTupleId().asInt(), needsFinalize); msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg); msg.agg_node.setIsUpdateStage(!aggInfo.isMerge()); + msg.agg_node.setAggregateFunctionChangedFlags(aggInfo.getMaterializedAggregateExprChangedFlags()); List<Expr> groupingExprs = aggInfo.getGroupingExprs(); if (groupingExprs != null) { msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 96a3f4f0eb..fda1f82e1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -172,12 +172,13 @@ public class Planner { singleNodePlan.convertToVectoriezd(); } - if (analyzer.getContext() != null - && analyzer.getContext().getSessionVariable().isEnableProjection() - && statement instanceof SelectStmt) { - ProjectPlanner projectPlanner = new ProjectPlanner(analyzer); - projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan); - } + // disable ProjectPlanner for now because there is some bug to be fixed + // if (analyzer.getContext() != null + // && analyzer.getContext().getSessionVariable().isEnableProjection() + // && statement instanceof SelectStmt) { + // ProjectPlanner projectPlanner = new ProjectPlanner(analyzer); + // projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan); + // } if (statement instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) statement; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index dfe90968cb..8ca90ec29c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -465,6 +465,10 @@ struct TAggregationNode { 5: required bool need_finalize 6: optional bool use_streaming_preaggregation 7: optional bool is_update_stage + + // to support vec outer join, in some case the agg function has different nullable property in serialize and merge phase + // we need pass this info to be to make the agg function serialize and deserialize correctly + 8: optional list<bool> aggregate_function_changed_flags } struct TRepeatNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org