This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit dcb66e084fa58a9c6ce797f961dfac2fbc430fc3 Author: Pxl <[email protected]> AuthorDate: Wed Jun 29 20:38:49 2022 +0800 [bugfix]fix core dump on outfile with expr (#10491) remove log --- be/src/vec/runtime/vfile_result_writer.cpp | 34 +++++++++++++++++++----------- be/src/vec/runtime/vfile_result_writer.h | 14 ++++++------ be/src/vec/sink/vresult_file_sink.cpp | 13 ++++++------ be/src/vec/sink/vresult_file_sink.h | 18 ++++++++-------- 4 files changed, 45 insertions(+), 34 deletions(-) diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 71a748e565..6d4ecb8db1 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -17,6 +17,7 @@ #include "vec/runtime/vfile_result_writer.h" +#include "common/status.h" #include "exprs/expr_context.h" #include "gutil/strings/numbers.h" #include "gutil/strings/substitute.h" @@ -35,22 +36,23 @@ #include "util/mysql_global.h" #include "util/mysql_row_buffer.h" #include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris::vectorized { const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; using doris::operator<<; -VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, - const TStorageBackendType::type storage_type, - const TUniqueId fragment_instance_id, - const std::vector<ExprContext*>& output_expr_ctxs, - RuntimeProfile* parent_profile, BufferControlBlock* sinker, - Block* output_block, bool output_object_data, - const RowDescriptor& output_row_descriptor) +VFileResultWriter::VFileResultWriter( + const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, + const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, Block* output_block, + bool output_object_data, const RowDescriptor& output_row_descriptor) : _file_opts(file_opts), _storage_type(storage_type), _fragment_instance_id(fragment_instance_id), - _output_expr_ctxs(output_expr_ctxs), + _output_vexpr_ctxs(output_vexpr_ctxs), _parent_profile(parent_profile), _sinker(sinker), _output_block(output_block), @@ -196,7 +198,16 @@ Status VFileResultWriter::append_block(Block& block) { if (_parquet_writer != nullptr) { return Status::NotSupported("Parquet Writer is not supported yet!"); } else { - RETURN_IF_ERROR(_write_csv_file(block)); + Status status = Status::OK(); + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec + // failed, just return the error status + auto output_block = VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, + block, status); + auto num_rows = output_block.rows(); + if (UNLIKELY(num_rows == 0)) { + return status; + } + RETURN_IF_ERROR(_write_csv_file(output_block)); } _written_rows += block.rows(); @@ -210,7 +221,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) { if (col.column->is_null_at(i)) { _plain_text_outstream << NULL_IN_CSV; } else { - switch (_output_expr_ctxs[col_id]->root()->type().type) { + switch (_output_vexpr_ctxs[col_id]->root()->type().type) { case TYPE_BOOLEAN: case TYPE_TINYINT: _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>( @@ -280,8 +291,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) { reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data) ->value); std::string decimal_str; - int output_scale = _output_expr_ctxs[col_id]->root()->output_scale(); - decimal_str = decimal_val.to_string(output_scale); + decimal_str = decimal_val.to_string(); _plain_text_outstream << decimal_str; break; } diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h index b7fd2cd737..5f0bb7971e 100644 --- a/be/src/vec/runtime/vfile_result_writer.h +++ b/be/src/vec/runtime/vfile_result_writer.h @@ -30,22 +30,22 @@ public: VFileResultWriter(const ResultFileOptions* file_option, const TStorageBackendType::type storage_type, const TUniqueId fragment_instance_id, - const std::vector<ExprContext*>& output_expr_ctxs, + const std::vector<VExprContext*>& _output_vexpr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker, Block* output_block, bool output_object_data, const RowDescriptor& output_row_descriptor); virtual ~VFileResultWriter() = default; - virtual Status append_block(Block& block) override; - virtual Status append_row_batch(const RowBatch* batch) override { + Status append_block(Block& block) override; + Status append_row_batch(const RowBatch* batch) override { return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!"); }; - virtual Status init(RuntimeState* state) override; - virtual Status close() override; + Status init(RuntimeState* state) override; + Status close() override; // file result writer always return statistic result in one row - virtual int64_t get_written_rows() const override { return 1; } + int64_t get_written_rows() const override { return 1; } private: Status _write_csv_file(const Block& block); @@ -77,7 +77,7 @@ private: const ResultFileOptions* _file_opts; TStorageBackendType::type _storage_type; TUniqueId _fragment_instance_id; - const std::vector<ExprContext*>& _output_expr_ctxs; + const std::vector<VExprContext*>& _output_vexpr_ctxs; // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 6d8d994585..b939332e39 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -77,9 +77,10 @@ Status VResultFileSink::init(const TDataSink& tsink) { Status VResultFileSink::prepare_exprs(RuntimeState* state) { // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs)); + RETURN_IF_ERROR( + VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_vexpr_ctxs)); // Prepare the exprs to run. - RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker)); + RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc, _expr_mem_tracker)); return Status::OK(); } @@ -100,7 +101,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { state->fragment_instance_id(), _buf_size, &_sender)); // create writer _writer.reset(new (std::nothrow) VFileResultWriter( - _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, _profile, _sender.get(), nullptr, state->return_object_data_as_binary(), _output_row_descriptor)); } else { @@ -115,7 +116,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { // create writer _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1)); _writer.reset(new (std::nothrow) VFileResultWriter( - _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(), _output_row_descriptor)); } @@ -127,7 +128,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { } Status VResultFileSink::open(RuntimeState* state) { - return Expr::open(_output_expr_ctxs, state); + return VExpr::open(_output_vexpr_ctxs, state); } Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) { @@ -186,7 +187,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { _output_block->clear(); } - Expr::close(_output_expr_ctxs, state); + VExpr::close(_output_vexpr_ctxs, state); _closed = true; return Status::OK(); diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h index e924883b42..f550fa3585 100644 --- a/be/src/vec/sink/vresult_file_sink.h +++ b/be/src/vec/sink/vresult_file_sink.h @@ -34,18 +34,18 @@ public: const std::vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch, const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs); - virtual ~VResultFileSink() = default; - virtual Status init(const TDataSink& thrift_sink) override; - virtual Status prepare(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; + ~VResultFileSink() override = default; + Status init(const TDataSink& thrift_sink) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; // send data in 'batch' to this backend stream mgr // Blocks until all rows in batch are placed in the buffer - virtual Status send(RuntimeState* state, RowBatch* batch) override; - virtual Status send(RuntimeState* state, Block* block) override; + Status send(RuntimeState* state, RowBatch* batch) override; + Status send(RuntimeState* state, Block* block) override; // Flush all buffered data and close all existing channels to destination // hosts. Further send() calls are illegal after calling close(). - virtual Status close(RuntimeState* state, Status exec_status) override; - virtual RuntimeProfile* profile() override { return _profile; } + Status close(RuntimeState* state, Status exec_status) override; + RuntimeProfile* profile() override { return _profile; } void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override; @@ -57,7 +57,7 @@ private: // Owned by the RuntimeState. const std::vector<TExpr>& _t_output_expr; - std::vector<ExprContext*> _output_expr_ctxs; + std::vector<vectorized::VExprContext*> _output_vexpr_ctxs; RowDescriptor _output_row_descriptor; std::unique_ptr<Block> _output_block = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
