This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch opt-explode-fastpath-593ec84d in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6db42a2f48425ac73771d1176667c1253044e6db Author: eldenmoon <[email protected]> AuthorDate: Thu Jan 29 15:45:13 2026 +0800 [opt](table-function) Optimize explode with block fast path - Add a block fast path for explode-like table functions to expand contiguous nested ranges without per-row get_value() - Fall back to the original row-wise path when the nested range is non-contiguous or unsupported - Add/extend unit tests to cover fast path and fallback cases Test: ./run-be-ut.sh --run --filter='TableFunctionOperatorTest.*:UnnestTest.*' --- be/src/pipeline/exec/table_function_operator.cpp | 188 +++++- be/src/pipeline/exec/table_function_operator.h | 8 + be/src/vec/columns/column_variant.cpp | 4 + be/src/vec/exec/scan/olap_scanner.cpp | 5 +- be/src/vec/exprs/table_function/table_function.h | 15 + be/src/vec/exprs/table_function/vexplode.cpp | 19 + be/src/vec/exprs/table_function/vexplode.h | 4 + be/src/vec/exprs/table_function/vexplode_v2.cpp | 23 + be/src/vec/exprs/table_function/vexplode_v2.h | 4 + .../vec/functions/array/function_array_utils.cpp | 13 +- .../operator/table_function_operator_test.cpp | 702 ++++++++++++++++++++- .../sql_functions/table_function/explode.groovy | 12 +- 12 files changed, 978 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 63e8bded2c6..076a7fbbfb5 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -17,12 +17,16 @@ #include "table_function_operator.h" +#include <cstring> #include <limits> #include <memory> +#include "common/cast_set.h" #include "pipeline/exec/operator.h" #include "util/simd/bits.h" #include "vec/common/custom_allocator.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/column_numbers.h" #include "vec/exprs/table_function/table_function_factory.h" @@ -76,6 +80,7 @@ Status TableFunctionLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(fn->open()); } _cur_child_offset = -1; + _block_fast_path_prepared = false; return Status::OK(); } @@ -160,6 +165,154 @@ bool TableFunctionLocalState::_is_inner_and_empty() { return false; } +bool TableFunctionLocalState::_can_use_block_fast_path() const { + auto& p = _parent->cast<TableFunctionOperatorX>(); + // Fast path is only valid when: + // - only one table function exists + // - there is an active child row to expand + // - the child block is non-empty + // - the table function can expose nested/offsets via prepare_block_fast_path() + return p._fn_num == 1 && _cur_child_offset != -1 && _child_block->rows() > 0 && + _fns[0]->support_block_fast_path(); +} + +Status TableFunctionLocalState::_get_expanded_block_block_fast_path( + RuntimeState* state, std::vector<vectorized::MutableColumnPtr>& columns) { + auto& p = _parent->cast<TableFunctionOperatorX>(); + // Fast path for explode-like table functions: + // Instead of calling TableFunction::get_value() row-by-row, copy the nested column range + // directly when the nested values in this output batch are contiguous in memory. + if (!_block_fast_path_prepared) { + RETURN_IF_ERROR( + _fns[0]->prepare_block_fast_path(_child_block.get(), state, &_block_fast_path_ctx)); + _block_fast_path_prepared = true; + _block_fast_path_row = _cur_child_offset; + _block_fast_path_in_row_offset = 0; + } + + const auto remaining_capacity = + state->batch_size() - cast_set<int>(columns[p._child_slots.size()]->size()); + if (remaining_capacity <= 0) { + return Status::OK(); + } + + if (_block_fast_path_ctx.offsets_ptr == nullptr || + _block_fast_path_ctx.nested_col.get() == nullptr) { + return Status::InternalError("block fast path context is invalid"); + } + + const auto& offsets = *_block_fast_path_ctx.offsets_ptr; + const auto child_rows = cast_set<int64_t>(offsets.size()); + if (child_rows != cast_set<int64_t>(_child_block->rows())) { + return Status::InternalError("block fast path offsets size mismatch"); + } + + std::vector<uint32_t> row_ids; + row_ids.reserve(remaining_capacity); + uint64_t first_nested_idx = 0; + uint64_t expected_next_nested_idx = 0; + + int64_t child_row = _block_fast_path_row; + uint64_t in_row_offset = _block_fast_path_in_row_offset; + int produced_rows = 0; + + while (produced_rows < remaining_capacity && child_row < child_rows) { + if (_block_fast_path_ctx.array_nullmap_data && + _block_fast_path_ctx.array_nullmap_data[child_row]) { + // NULL array row: skip it here. Slow path will handle output semantics if needed. + child_row++; + in_row_offset = 0; + continue; + } + + const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1]; + const uint64_t cur_off = offsets[child_row]; + const uint64_t nested_len = cur_off - prev_off; + + if (in_row_offset >= nested_len) { + child_row++; + in_row_offset = 0; + continue; + } + + const uint64_t remaining_in_row = nested_len - in_row_offset; + const int take_count = std::min<int>(remaining_capacity - produced_rows, + cast_set<int>(remaining_in_row)); + const uint64_t nested_start = prev_off + in_row_offset; + + DCHECK_LE(nested_start + take_count, cur_off); + DCHECK_LE(nested_start + take_count, _block_fast_path_ctx.nested_col->size()); + + if (produced_rows == 0) { + first_nested_idx = nested_start; + expected_next_nested_idx = nested_start; + } else if (nested_start != expected_next_nested_idx) { + // This fast path requires a single contiguous nested range to avoid per-row scatter. + return Status::NotSupported("block fast path requires contiguous nested range"); + } + + // Map each produced output row back to its source child row for copying non-table-function + // columns via insert_indices_from(). + for (int j = 0; j < take_count; ++j) { + row_ids.push_back(cast_set<uint32_t>(child_row)); + } + + produced_rows += take_count; + expected_next_nested_idx += take_count; + in_row_offset += take_count; + if (in_row_offset >= nested_len) { + child_row++; + in_row_offset = 0; + } + } + + if (produced_rows > 0) { + for (auto index : p._output_slot_indexs) { + auto src_column = _child_block->get_by_position(index).column; + columns[index]->insert_indices_from(*src_column, row_ids.data(), + row_ids.data() + produced_rows); + } + + auto& out_col = columns[p._child_slots.size()]; + if (out_col->is_nullable()) { + auto* out_nullable = assert_cast<vectorized::ColumnNullable*>(out_col.get()); + out_nullable->get_nested_column_ptr()->insert_range_from(*_block_fast_path_ctx.nested_col, + first_nested_idx, produced_rows); + auto* nullmap_column = assert_cast<vectorized::ColumnUInt8*>( + out_nullable->get_null_map_column_ptr().get()); + auto& nullmap_data = nullmap_column->get_data(); + const size_t old_size = nullmap_data.size(); + nullmap_data.resize(old_size + produced_rows); + if (_block_fast_path_ctx.nested_nullmap_data != nullptr) { + memcpy(nullmap_data.data() + old_size, + _block_fast_path_ctx.nested_nullmap_data + first_nested_idx, + produced_rows * sizeof(vectorized::UInt8)); + } else { + memset(nullmap_data.data() + old_size, 0, + produced_rows * sizeof(vectorized::UInt8)); + } + } else { + out_col->insert_range_from(*_block_fast_path_ctx.nested_col, first_nested_idx, + produced_rows); + } + } + + _block_fast_path_row = child_row; + _block_fast_path_in_row_offset = in_row_offset; + _cur_child_offset = child_row >= child_rows ? -1 : child_row; + + if (child_row >= child_rows) { + for (vectorized::TableFunction* fn : _fns) { + fn->process_close(); + } + _child_block->clear_column_data( + _parent->cast<TableFunctionOperatorX>()._child->row_desc().num_materialized_slots()); + _block_fast_path_prepared = false; + } + + return Status::OK(); +} + Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_process_rows_timer); @@ -177,23 +330,44 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, _fns[i]->set_nullable(); } } - while (columns[p._child_slots.size()]->size() < state->batch_size()) { - RETURN_IF_CANCELLED(state); - if (_child_block->rows() == 0) { - break; + // Try the block fast path first. If the table function reports NOT_IMPLEMENTED (e.g. non- + // contiguous nested range), fall back to the original row-wise path. + bool fallback_to_slow_path = false; + if (_can_use_block_fast_path()) { + RETURN_IF_CANCELLED(state); + auto st = _get_expanded_block_block_fast_path(state, columns); + if (!st.ok()) { + if (st.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) { + fallback_to_slow_path = true; + _block_fast_path_prepared = false; + } else { + return st; + } } + } else { + fallback_to_slow_path = true; + } + if (fallback_to_slow_path) { bool skip_child_row = false; while (columns[p._child_slots.size()]->size() < state->batch_size()) { + RETURN_IF_CANCELLED(state); + + if (_child_block->rows() == 0) { + break; + } + int idx = _find_last_fn_eos_idx(); if (idx == 0 || skip_child_row) { _copy_output_slots(columns, p); // all table functions' results are exhausted, process next child row. process_next_child_row(); + skip_child_row = false; if (_cur_child_offset == -1) { break; } + continue; } else if (idx < p._fn_num && idx != -1) { // some of table functions' results are exhausted. if (!_roll_table_functions(idx)) { @@ -208,13 +382,8 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, } DCHECK_LE(1, p._fn_num); - // It may take multiple iterations of this while loop to process a child row if - // any table function produces a large number of rows. auto repeat_times = _fns[p._fn_num - 1]->get_value( columns[p._child_slots.size() + p._fn_num - 1], - //// It has already been checked that - // columns[p._child_slots.size()]->size() < state->batch_size(), - // so columns[p._child_slots.size()]->size() will not exceed the range of int. state->batch_size() - (int)columns[p._child_slots.size()]->size()); _current_row_insert_times += repeat_times; for (int i = 0; i < p._fn_num - 1; i++) { @@ -463,6 +632,7 @@ void TableFunctionLocalState::process_next_child_row() { .num_materialized_slots()); } _cur_child_offset = -1; + _block_fast_path_prepared = false; return; } diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 56f9f116ec8..e73b73dfa0d 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -65,6 +65,9 @@ private: // >0: some of fns are eos int _find_last_fn_eos_idx() const; bool _is_inner_and_empty(); + bool _can_use_block_fast_path() const; + Status _get_expanded_block_block_fast_path(RuntimeState* state, + std::vector<vectorized::MutableColumnPtr>& columns); Status _get_expanded_block_for_outer_conjuncts(RuntimeState* state, vectorized::Block* output_block, bool* eos); @@ -80,6 +83,11 @@ private: bool _child_eos = false; DorisVector<bool> _child_rows_has_output; + bool _block_fast_path_prepared = false; + vectorized::TableFunction::BlockFastPathContext _block_fast_path_ctx; + int64_t _block_fast_path_row = 0; + uint64_t _block_fast_path_in_row_offset = 0; + RuntimeProfile::Counter* _init_function_timer = nullptr; RuntimeProfile::Counter* _process_rows_timer = nullptr; RuntimeProfile::Counter* _filter_timer = nullptr; diff --git a/be/src/vec/columns/column_variant.cpp b/be/src/vec/columns/column_variant.cpp index 64bbf57b750..1fb34e685df 100644 --- a/be/src/vec/columns/column_variant.cpp +++ b/be/src/vec/columns/column_variant.cpp @@ -2130,6 +2130,10 @@ void ColumnVariant::create_root(const DataTypePtr& type, MutableColumnPtr&& colu if (num_rows == 0) { num_rows = column->size(); } + DCHECK_EQ(type->is_nullable(), column->is_nullable()) + << "type nullable=" << type->is_nullable() + << " column nullable=" << column->is_nullable() + << " type=" << type->get_name(); add_sub_column({}, std::move(column), type); if (serialized_sparse_column->empty()) { serialized_sparse_column->resize(num_rows); diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 334838586c8..748415c41c6 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -591,14 +591,15 @@ Status OlapScanner::_init_return_columns() { } const auto& column = tablet_schema->column(index); + int32_t unique_id = column.unique_id() > 0 ? column.unique_id() : column.parent_unique_id(); if (!slot->all_access_paths().empty()) { _tablet_reader_params.all_access_paths.insert( - {column.unique_id(), slot->all_access_paths()}); + {unique_id, slot->all_access_paths()}); } if (!slot->predicate_access_paths().empty()) { _tablet_reader_params.predicate_access_paths.insert( - {column.unique_id(), slot->predicate_access_paths()}); + {unique_id, slot->predicate_access_paths()}); } if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT || diff --git a/be/src/vec/exprs/table_function/table_function.h b/be/src/vec/exprs/table_function/table_function.h index ef36ca881d7..e400dcb311b 100644 --- a/be/src/vec/exprs/table_function/table_function.h +++ b/be/src/vec/exprs/table_function/table_function.h @@ -20,8 +20,10 @@ #include <fmt/core.h> #include <cstddef> +#include <cstdint> #include "common/status.h" +#include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" @@ -34,6 +36,13 @@ class TableFunction { public: virtual ~TableFunction() = default; + struct BlockFastPathContext { + const UInt8* array_nullmap_data = nullptr; + const IColumn::Offsets64* offsets_ptr = nullptr; + ColumnPtr nested_col = nullptr; + const UInt8* nested_nullmap_data = nullptr; + }; + virtual Status prepare() { return Status::OK(); } virtual Status open() { return Status::OK(); } @@ -58,6 +67,12 @@ public: virtual void get_same_many_values(MutableColumnPtr& column, int length = 0) = 0; virtual int get_value(MutableColumnPtr& column, int max_step) = 0; + virtual bool support_block_fast_path() const { return false; } + virtual Status prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/, + BlockFastPathContext* /*ctx*/) { + return Status::NotSupported("table function {} doesn't support block fast path", _fn_name); + } + virtual Status close() { return Status::OK(); } virtual void forward(int step = 1) { diff --git a/be/src/vec/exprs/table_function/vexplode.cpp b/be/src/vec/exprs/table_function/vexplode.cpp index 8b090a87177..eeb8287113e 100644 --- a/be/src/vec/exprs/table_function/vexplode.cpp +++ b/be/src/vec/exprs/table_function/vexplode.cpp @@ -92,6 +92,25 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) { return Status::OK(); } +bool VExplodeTableFunction::support_block_fast_path() const { + return !_is_outer; +} + +Status VExplodeTableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/, + BlockFastPathContext* ctx) { + if (!support_block_fast_path()) { + return Status::NotSupported("vexplode doesn't support block fast path in current mode"); + } + if (_detail.offsets_ptr == nullptr || _detail.nested_col.get() == nullptr) { + return Status::InternalError("vexplode block fast path not initialized"); + } + ctx->array_nullmap_data = _detail.array_nullmap_data; + ctx->offsets_ptr = _detail.offsets_ptr; + ctx->nested_col = _detail.nested_col; + ctx->nested_nullmap_data = _detail.nested_nullmap_data; + return Status::OK(); +} + void VExplodeTableFunction::process_row(size_t row_idx) { DCHECK(row_idx < _array_column->size()); TableFunction::process_row(row_idx); diff --git a/be/src/vec/exprs/table_function/vexplode.h b/be/src/vec/exprs/table_function/vexplode.h index 7b53926ae2c..24a36f0dfb0 100644 --- a/be/src/vec/exprs/table_function/vexplode.h +++ b/be/src/vec/exprs/table_function/vexplode.h @@ -46,6 +46,10 @@ public: void get_same_many_values(MutableColumnPtr& column, int length) override; int get_value(MutableColumnPtr& column, int max_step) override; + bool support_block_fast_path() const override; + Status prepare_block_fast_path(Block* block, RuntimeState* state, + BlockFastPathContext* ctx) override; + private: Status _process_init_variant(Block* block, int value_column_idx); ColumnPtr _array_column; diff --git a/be/src/vec/exprs/table_function/vexplode_v2.cpp b/be/src/vec/exprs/table_function/vexplode_v2.cpp index dbeef4f438b..7cb96514528 100644 --- a/be/src/vec/exprs/table_function/vexplode_v2.cpp +++ b/be/src/vec/exprs/table_function/vexplode_v2.cpp @@ -107,6 +107,29 @@ Status VExplodeV2TableFunction::process_init(Block* block, RuntimeState* state) return Status::OK(); } +bool VExplodeV2TableFunction::support_block_fast_path() const { + return !_is_outer && !_generate_row_index; +} + +Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/, + BlockFastPathContext* ctx) { + if (!support_block_fast_path()) { + return Status::NotSupported("vexplode doesn't support block fast path in current mode"); + } + if (_multi_detail.size() != 1) { + return Status::NotSupported("vexplode block fast path only supports single parameter"); + } + const auto& detail = _multi_detail[0]; + if (detail.offsets_ptr == nullptr || detail.nested_col.get() == nullptr) { + return Status::InternalError("vexplode block fast path not initialized"); + } + ctx->array_nullmap_data = detail.array_nullmap_data; + ctx->offsets_ptr = detail.offsets_ptr; + ctx->nested_col = detail.nested_col; + ctx->nested_nullmap_data = detail.nested_nullmap_data; + return Status::OK(); +} + void VExplodeV2TableFunction::process_row(size_t row_idx) { TableFunction::process_row(row_idx); diff --git a/be/src/vec/exprs/table_function/vexplode_v2.h b/be/src/vec/exprs/table_function/vexplode_v2.h index f4b6bddbf42..141bc393189 100644 --- a/be/src/vec/exprs/table_function/vexplode_v2.h +++ b/be/src/vec/exprs/table_function/vexplode_v2.h @@ -46,6 +46,10 @@ public: void get_same_many_values(MutableColumnPtr& column, int length) override; int get_value(MutableColumnPtr& column, int max_step) override; + bool support_block_fast_path() const override; + Status prepare_block_fast_path(Block* block, RuntimeState* state, + BlockFastPathContext* ctx) override; + void set_generate_row_index(bool generate_row_index) { _generate_row_index = generate_row_index; } diff --git a/be/src/vec/functions/array/function_array_utils.cpp b/be/src/vec/functions/array/function_array_utils.cpp index 7620d7179c8..c7636c8952d 100644 --- a/be/src/vec/functions/array/function_array_utils.cpp +++ b/be/src/vec/functions/array/function_array_utils.cpp @@ -58,7 +58,18 @@ bool extract_column_array_info(const IColumn& src, ColumnArrayExecutionData& dat data.nested_type->get_primitive_type() != PrimitiveType::TYPE_VARIANT) { // set variant root column/type to from column/type auto variant = ColumnVariant::create(true /*always nullable*/); - variant->create_root(data.nested_type, make_nullable(data.nested_col)->assume_mutable()); + auto root_col = data.nested_col->assume_mutable(); + if (data.nested_type->is_nullable() && !root_col->is_nullable()) { + auto null_map = ColumnUInt8::create(); + auto& null_map_data = null_map->get_data(); + null_map_data.resize(root_col->size(), 0); + if (data.nested_nullmap_data != nullptr) { + std::copy(data.nested_nullmap_data, data.nested_nullmap_data + root_col->size(), + null_map_data.begin()); + } + root_col = ColumnNullable::create(std::move(root_col), std::move(null_map)); + } + variant->create_root(data.nested_type, std::move(root_col)); data.nested_col = variant->get_ptr(); } return true; diff --git a/be/test/pipeline/operator/table_function_operator_test.cpp b/be/test/pipeline/operator/table_function_operator_test.cpp index bd4e4fd67ed..97f11027f1e 100644 --- a/be/test/pipeline/operator/table_function_operator_test.cpp +++ b/be/test/pipeline/operator/table_function_operator_test.cpp @@ -32,7 +32,12 @@ #include "testutil/mock/mock_literal_expr.h" #include "testutil/mock/mock_operators.h" #include "testutil/mock/mock_slot_ref.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" #include "vec/core/block.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/functions/array/function_array_utils.h" namespace doris::pipeline { using namespace vectorized; @@ -91,6 +96,89 @@ public: } }; +class MockFastExplodeTableFunction : public TableFunction { +public: + explicit MockFastExplodeTableFunction(bool* get_value_called) + : _get_value_called(get_value_called) {} + + Status process_init(Block* block, RuntimeState* /*state*/) override { + // Expose array offsets / nested column for TableFunctionLocalState block fast path. + vectorized::ColumnArrayExecutionData data; + auto array_col = block->get_by_position(1).column->convert_to_full_column_if_const(); + if (!vectorized::extract_column_array_info(*array_col, data)) { + return Status::InternalError("invalid array column"); + } + _detail = data; + return Status::OK(); + } + + void process_row(size_t row_idx) override { + TableFunction::process_row(row_idx); + if (!_detail.array_nullmap_data || !_detail.array_nullmap_data[row_idx]) { + _array_offset = row_idx == 0 ? 0 : (*_detail.offsets_ptr)[row_idx - 1]; + _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset; + } + } + + void process_close() override { + _detail.reset(); + _array_offset = 0; + } + + void get_same_many_values(MutableColumnPtr& column, int length) override { + column->insert_many_defaults(length); + } + + int get_value(MutableColumnPtr& column, int max_step) override { + if (_get_value_called) { + // Slow path indicator: fast path tests expect this to remain false. + *_get_value_called = true; + } + max_step = std::min(max_step, cast_set<int>(_cur_size - _cur_offset)); + const size_t pos = _array_offset + _cur_offset; + if (current_empty()) { + column->insert_default(); + max_step = 1; + } else if (column->is_nullable()) { + auto* nullable_column = assert_cast<ColumnNullable*>(column.get()); + nullable_column->get_nested_column_ptr()->insert_range_from(*_detail.nested_col, pos, + max_step); + auto* nullmap_column = assert_cast<ColumnUInt8*>( + nullable_column->get_null_map_column_ptr().get()); + const size_t old_size = nullmap_column->size(); + nullmap_column->resize(old_size + max_step); + if (_detail.nested_nullmap_data != nullptr) { + memcpy(nullmap_column->get_data().data() + old_size, + _detail.nested_nullmap_data + pos, max_step * sizeof(UInt8)); + } else { + memset(nullmap_column->get_data().data() + old_size, 0, + max_step * sizeof(UInt8)); + } + } else { + column->insert_range_from(*_detail.nested_col, pos, max_step); + } + forward(max_step); + return max_step; + } + + bool support_block_fast_path() const override { return true; } + + Status prepare_block_fast_path(Block* /*block*/, RuntimeState* /*state*/, + BlockFastPathContext* ctx) override { + // NOTE: process_init() must be called before this to fill `_detail`. + ctx->array_nullmap_data = _detail.array_nullmap_data; + ctx->offsets_ptr = _detail.offsets_ptr; + ctx->nested_col = _detail.nested_col; + ctx->nested_nullmap_data = _detail.nested_nullmap_data; + return Status::OK(); + } + +private: + bool* _get_value_called = nullptr; + vectorized::ColumnArrayExecutionData _detail; + size_t _array_offset = 0; +}; + struct MockTableFunctionLocalState : TableFunctionLocalState { MockTableFunctionLocalState(RuntimeState* state, OperatorXBase* parent) : TableFunctionLocalState(state, parent) {} @@ -225,6 +313,618 @@ TEST_F(TableFunctionOperatorTest, single_fn_test2) { } } +TEST_F(TableFunctionOperatorTest, block_fast_path_explode) { + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, false, true}; + + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>())}, + &pool}); + op->_mock_row_descriptor.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>()), + std::make_shared<vectorized::DataTypeInt32>()}, + &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10, 20, 30, 40}); + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(2); + nested->insert_value(3); + nested->insert_value(4); + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(1); + offsets->insert_value(3); + offsets->insert_value(3); + offsets->insert_value(4); + auto arr_col = ColumnArray::create(std::move(nested), std::move(offsets)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_FALSE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 20, 20, 40}); + + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(1); + expected_nested->insert_value(2); + expected_nested->insert_value(3); + expected_nested->insert_value(2); + expected_nested->insert_value(3); + expected_nested->insert_value(4); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(1); + expected_offsets->insert_value(3); + expected_offsets->insert_value(5); + expected_offsets->insert_value(6); + auto expected_arr = ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({1, 2, 3, 4}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + +TEST_F(TableFunctionOperatorTest, block_fast_path_explode_batch_truncate) { + state->batsh_size = 2; + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, true, true}; + + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>())}, + &pool}); + op->_mock_row_descriptor.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>()), + std::make_shared<vectorized::DataTypeInt32>()}, + &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10, 20, 30, 40}); + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(2); + nested->insert_value(3); + nested->insert_value(4); + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(1); + offsets->insert_value(3); + offsets->insert_value(3); + offsets->insert_value(4); + auto arr_col = ColumnArray::create(std::move(nested), std::move(offsets)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_FALSE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 20}); + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(1); + expected_nested->insert_value(2); + expected_nested->insert_value(3); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(1); + expected_offsets->insert_value(3); + auto expected_arr = ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({1, 2}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({20, 40}); + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(2); + expected_nested->insert_value(3); + expected_nested->insert_value(4); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(2); + expected_offsets->insert_value(3); + auto expected_arr = ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({3, 4}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + +TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_array_skip) { + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, true, true}; + + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>())}, + &pool}); + op->_mock_row_descriptor.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeInt32>()), + std::make_shared<vectorized::DataTypeInt32>()}, + &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10, 20, 30}); + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(2); + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(1); + offsets->insert_value(1); + offsets->insert_value(2); + auto arr_col = ColumnArray::create(std::move(nested), std::move(offsets)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_FALSE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 30}); + + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(1); + expected_nested->insert_value(2); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(1); + expected_offsets->insert_value(2); + auto expected_arr = ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({1, 2}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = std::make_shared<DataTypeArray>(int_type); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + +TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_array_null_row) { + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, true, true}; + + auto int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto child_arr_type = std::make_shared<vectorized::DataTypeNullable>( + std::make_shared<vectorized::DataTypeArray>(int_type)); + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {int_type, child_arr_type}, + &pool}); + op->_mock_row_descriptor.reset( + new MockRowDescriptor {{int_type, child_arr_type, int_type}, &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10, 20, 30}); + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(2); + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(1); + offsets->insert_value(1); + offsets->insert_value(2); + auto arr_data = ColumnArray::create(std::move(nested), std::move(offsets)); + auto null_map = ColumnUInt8::create(); + null_map->insert_value(0); + null_map->insert_value(1); + null_map->insert_value(0); + auto arr_col = ColumnNullable::create(std::move(arr_data), std::move(null_map)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type)); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_FALSE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 30}); + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(1); + expected_nested->insert_value(2); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(1); + expected_offsets->insert_value(2); + auto expected_arr_data = + ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + auto expected_arr_null = ColumnUInt8::create(); + expected_arr_null->insert_value(0); + expected_arr_null->insert_value(0); + auto expected_arr = ColumnNullable::create(std::move(expected_arr_data), + std::move(expected_arr_null)); + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({1, 2}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type)); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + +TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_array_misaligned_fallback) { + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, true, true}; + + auto int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto child_arr_type = std::make_shared<vectorized::DataTypeNullable>( + std::make_shared<vectorized::DataTypeArray>(int_type)); + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {int_type, child_arr_type}, + &pool}); + op->_mock_row_descriptor.reset( + new MockRowDescriptor {{int_type, child_arr_type, int_type}, &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10, 20, 30}); + auto nested = ColumnInt32::create(); + nested->insert_value(1); + nested->insert_value(9); + nested->insert_value(2); + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(1); + offsets->insert_value(2); + offsets->insert_value(3); + auto arr_data = ColumnArray::create(std::move(nested), std::move(offsets)); + auto null_map = ColumnUInt8::create(); + null_map->insert_value(0); + null_map->insert_value(1); + null_map->insert_value(0); + auto arr_col = ColumnNullable::create(std::move(arr_data), std::move(null_map)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type)); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_TRUE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 30}); + auto expected_nested = ColumnInt32::create(); + expected_nested->insert_value(1); + expected_nested->insert_value(2); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(1); + expected_offsets->insert_value(2); + auto expected_arr_data = + ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + auto expected_arr_null = ColumnUInt8::create(); + expected_arr_null->insert_value(0); + expected_arr_null->insert_value(0); + auto expected_arr = ColumnNullable::create(std::move(expected_arr_data), + std::move(expected_arr_null)); + auto expected_out = ColumnHelper::create_column<DataTypeInt32>({1, 2}); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto arr_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type)); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(expected_out, int_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + +TEST_F(TableFunctionOperatorTest, block_fast_path_explode_nullable_elements) { + bool get_value_called = false; + { + op->_vfn_ctxs = + MockSlotRef::create_mock_contexts(DataTypes {std::make_shared<DataTypeInt32>()}); + auto fn = std::make_shared<MockFastExplodeTableFunction>(&get_value_called); + fns.push_back(fn); + op->_fns.push_back(fn.get()); + op->_output_slot_ids = {true, true, true}; + + child_op->_mock_row_desc.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeNullable>( + std::make_shared<vectorized::DataTypeInt32>()))}, + &pool}); + op->_mock_row_descriptor.reset(new MockRowDescriptor { + {std::make_shared<vectorized::DataTypeInt32>(), + std::make_shared<vectorized::DataTypeArray>( + std::make_shared<vectorized::DataTypeNullable>( + std::make_shared<vectorized::DataTypeInt32>())), + std::make_shared<vectorized::DataTypeNullable>( + std::make_shared<vectorized::DataTypeInt32>())}, + &pool}); + + op->_fn_num = 1; + EXPECT_TRUE(op->prepare(state.get())); + + local_state_uptr = std::make_unique<MockTableFunctionLocalState>(state.get(), op.get()); + local_state = local_state_uptr.get(); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + EXPECT_TRUE(local_state->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state_uptr)); + EXPECT_TRUE(local_state->open(state.get())); + } + + { + auto id_col = ColumnHelper::create_column<DataTypeInt32>({10}); + + auto nested_data = ColumnInt32::create(); + nested_data->insert_value(1); + nested_data->insert_value(0); + nested_data->insert_value(2); + auto nested_null = ColumnUInt8::create(); + nested_null->insert_value(0); + nested_null->insert_value(1); + nested_null->insert_value(0); + auto nested = ColumnNullable::create(std::move(nested_data), std::move(nested_null)); + + auto offsets = ColumnArray::ColumnOffsets::create(); + offsets->insert_value(3); + auto arr_col = ColumnArray::create(std::move(nested), std::move(offsets)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto nested_type = std::make_shared<DataTypeNullable>(int_type); + auto arr_type = std::make_shared<DataTypeArray>(nested_type); + *local_state->_child_block = Block({ColumnWithTypeAndName(id_col, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(arr_col)), + arr_type, "arr")}); + auto st = op->push(state.get(), local_state->_child_block.get(), true); + EXPECT_TRUE(st) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = op->pull(state.get(), &block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_FALSE(get_value_called); + + auto expected_id = ColumnHelper::create_column<DataTypeInt32>({10, 10, 10}); + + auto expected_nested_data = ColumnInt32::create(); + expected_nested_data->insert_value(1); + expected_nested_data->insert_value(0); + expected_nested_data->insert_value(2); + expected_nested_data->insert_value(1); + expected_nested_data->insert_value(0); + expected_nested_data->insert_value(2); + expected_nested_data->insert_value(1); + expected_nested_data->insert_value(0); + expected_nested_data->insert_value(2); + auto expected_nested_null = ColumnUInt8::create(); + expected_nested_null->insert_value(0); + expected_nested_null->insert_value(1); + expected_nested_null->insert_value(0); + expected_nested_null->insert_value(0); + expected_nested_null->insert_value(1); + expected_nested_null->insert_value(0); + expected_nested_null->insert_value(0); + expected_nested_null->insert_value(1); + expected_nested_null->insert_value(0); + auto expected_nested = + ColumnNullable::create(std::move(expected_nested_data), std::move(expected_nested_null)); + auto expected_offsets = ColumnArray::ColumnOffsets::create(); + expected_offsets->insert_value(3); + expected_offsets->insert_value(6); + expected_offsets->insert_value(9); + auto expected_arr = + ColumnArray::create(std::move(expected_nested), std::move(expected_offsets)); + + auto expected_out_data = ColumnInt32::create(); + expected_out_data->insert_value(1); + expected_out_data->insert_value(0); + expected_out_data->insert_value(2); + auto expected_out_null = ColumnUInt8::create(); + expected_out_null->insert_value(0); + expected_out_null->insert_value(1); + expected_out_null->insert_value(0); + auto expected_out = + ColumnNullable::create(std::move(expected_out_data), std::move(expected_out_null)); + + auto int_type = std::make_shared<DataTypeInt32>(); + auto nested_type = std::make_shared<DataTypeNullable>(int_type); + auto arr_type = std::make_shared<DataTypeArray>(nested_type); + auto out_type = std::make_shared<DataTypeNullable>(int_type); + Block expected({ColumnWithTypeAndName(expected_id, int_type, "id"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_arr)), arr_type, "arr"), + ColumnWithTypeAndName(ColumnPtr(std::move(expected_out)), out_type, "x")}); + EXPECT_TRUE(ColumnHelper::block_equal(block, expected)); + } +} + TEST_F(TableFunctionOperatorTest, single_two_test) { { op->_vfn_ctxs = MockSlotRef::create_mock_contexts( @@ -931,4 +1631,4 @@ TEST_F(UnnestTest, outer) { EXPECT_TRUE(ColumnHelper::block_equal(output_block, expected_output_block)); } } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/regression-test/suites/query_p0/sql_functions/table_function/explode.groovy b/regression-test/suites/query_p0/sql_functions/table_function/explode.groovy index 51b8dcf4ec6..68e7da13b0b 100644 --- a/regression-test/suites/query_p0/sql_functions/table_function/explode.groovy +++ b/regression-test/suites/query_p0/sql_functions/table_function/explode.groovy @@ -181,12 +181,12 @@ suite("explode") { qt_test16 "select id,e1,e2,e3 from array_test as a lateral view explode_outer(a.array_string,a.array_int,a.array_int) tmp1 as e1,e2,e3;" qt_test17 "select id,e1,e2,e11,e12 from array_test as a lateral view explode_outer(a.array_int,a.array_string) tmp1 as e1,e2 lateral view explode_outer(a.array_int,a.array_string) tmp2 as e11,e12;" - qt_test18 "select id,e1 from array_test as a lateral view explode_variant_array(a.v_string['a']) tmp1 as e1;" - qt_test19 "select id,e1 from array_test as a lateral view explode_variant_array(a.v_int['a']) tmp1 as e1;" - qt_test20 "select id,e1,e2 from array_test as a lateral view explode_variant_array(a.v_int['a'],a.v_string['a']) tmp1 as e1,e2;" - qt_test21 "select id,e1,e2 from array_test as a lateral view explode_variant_array(a.v_string['a'],a.v_int['a']) tmp1 as e1,e2;" - qt_test22 "select id,e1,e2,e3 from array_test as a lateral view explode_variant_array(a.v_string['a'],a.v_int['a'],a.v_int['a']) tmp1 as e1,e2,e3;" - qt_test23 "select id,e1,e2,e11,e12 from array_test as a lateral view explode_variant_array(a.v_int['a'],a.v_string['a']) tmp1 as e1,e2 lateral view explode_variant_array(a.v_int['a'],a.v_string['a']) tmp2 as e11,e12;" + qt_test18 "select id,e1 from array_test as a lateral view explode(a.v_string['a']) tmp1 as e1;" + qt_test19 "select id,e1 from array_test as a lateral view explode(a.v_int['a']) tmp1 as e1;" + qt_test20 "select id,e1,e2 from array_test as a lateral view explode(a.v_int['a'],a.v_string['a']) tmp1 as e1,e2;" + qt_test21 "select id,e1,e2 from array_test as a lateral view explode(a.v_string['a'],a.v_int['a']) tmp1 as e1,e2;" + qt_test22 "select id,e1,e2,e3 from array_test as a lateral view explode(a.v_string['a'],a.v_int['a'],a.v_int['a']) tmp1 as e1,e2,e3;" + qt_test23 "select id,e1,e2,e11,e12 from array_test as a lateral view explode(a.v_int['a'],a.v_string['a']) tmp1 as e1,e2 lateral view explode(a.v_int['a'],a.v_string['a']) tmp2 as e11,e12;" sql "DROP TABLE IF EXISTS array_test2;" sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
