This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 84bef605e02 [refactor](datalake) return the error status instead of static_cast<void> (#34873) 84bef605e02 is described below commit 84bef605e02a57cf9e5a285be9c67ddde8a7ab37 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed May 22 14:08:42 2024 +0800 [refactor](datalake) return the error status instead of static_cast<void> (#34873) Followup #34797 `static_cast<void>` has ignored the wrong status, some of them should make the query finished with error status, so replace `static_cast<void>` with `RETURN_IF_ERROR`. ### Remaining Works The following three scenarios need to be handled separately and cannot be simply replaced: 1. The outer function returns void; 2. Call status function inner constructors or destructors; 3. Call status function with best effort, and should ignore the wrong status. --- be/src/io/fs/buffered_reader.cpp | 4 +-- be/src/io/fs/buffered_reader.h | 1 - be/src/olap/wal/wal_reader.cpp | 7 ++--- be/src/vec/exec/format/avro/avro_jni_reader.cpp | 8 ++--- be/src/vec/exec/format/avro/avro_jni_reader.h | 11 ++----- be/src/vec/exec/format/jni_reader.cpp | 2 +- be/src/vec/exec/format/jni_reader.h | 35 ++++++++++++++++++--- be/src/vec/exec/format/orc/vorc_reader.cpp | 36 ++++++++++++++-------- be/src/vec/exec/format/orc/vorc_reader.h | 29 ++++++++++++----- .../exec/format/parquet/vparquet_group_reader.cpp | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 10 +++--- be/src/vec/exec/format/table/hudi_jni_reader.cpp | 8 ++--- be/src/vec/exec/format/table/hudi_jni_reader.h | 9 ++---- .../exec/format/table/max_compute_jni_reader.cpp | 8 ++--- .../vec/exec/format/table/max_compute_jni_reader.h | 9 ++---- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 2 +- be/src/vec/exec/format/table/paimon_jni_reader.h | 10 ++---- .../format/table/transactional_hive_reader.cpp | 2 +- .../format/table/trino_connector_jni_reader.cpp | 2 +- .../exec/format/table/trino_connector_jni_reader.h | 9 ++---- be/src/vec/exec/format/wal/wal_reader.cpp | 6 ---- be/src/vec/exec/format/wal/wal_reader.h | 9 +++++- be/src/vec/exec/jni_connector.cpp | 4 --- be/src/vec/exec/jni_connector.h | 3 +- be/src/vec/exec/join/vjoin_node_base.cpp | 2 +- be/src/vec/exec/scan/new_es_scanner.cpp | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 15 +++++---- be/src/vec/exec/scan/scanner_scheduler.h | 24 ++++++++++++--- 28 files changed, 146 insertions(+), 123 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 206845e90ce..664997088d9 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -413,8 +413,8 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } - static_cast<void>(ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( - [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); })); + _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } // only this function would run concurrently in another thread diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 0195f5d8308..70c8445db23 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -173,7 +173,6 @@ public: for (char* box : _boxes) { delete[] box; } - static_cast<void>(close()); } Status close() override { diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index fa96f0c5a0b..6e6a530f8db 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -50,11 +50,8 @@ Status WalReader::init() { } Status WalReader::finalize() { - if (file_reader != nullptr) { - auto st = file_reader->close(); - if (!st.ok()) { - LOG(WARNING) << "fail to close wal " << _file_name << " st= " << st.to_string(); - } + if (file_reader) { + return file_reader->close(); } return Status::OK(); } diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index ec08c58d18a..03135aa5c94 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -29,16 +29,12 @@ AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, const std::vector<SlotDescriptor*>& file_slot_descs, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), - _state(state), - _profile(profile), - _params(params), - _range(range) {} + : JniReader(file_slot_descs, state, profile), _params(params), _range(range) {} AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs) - : _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {} + : JniReader(file_slot_descs, nullptr, profile), _params(params), _range(range) {} AvroJNIReader::~AvroJNIReader() = default; diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h b/be/src/vec/exec/format/avro/avro_jni_reader.h index 64dac0aba4f..82388f32915 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.h +++ b/be/src/vec/exec/format/avro/avro_jni_reader.h @@ -28,8 +28,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -48,7 +47,7 @@ namespace doris::vectorized { /** * Read avro-format file */ -class AvroJNIReader : public GenericReader { +class AvroJNIReader : public JniReader { ENABLE_FACTORY_CREATOR(AvroJNIReader); public: @@ -83,16 +82,10 @@ public: TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema); - TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema); - private: - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; const TFileScanRangeParams _params; const TFileRangeDesc _range; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr; - std::unique_ptr<JniConnector> _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/jni_reader.cpp b/be/src/vec/exec/format/jni_reader.cpp index 625c79ffff2..563f6cbea51 100644 --- a/be/src/vec/exec/format/jni_reader.cpp +++ b/be/src/vec/exec/format/jni_reader.cpp @@ -37,7 +37,7 @@ namespace doris::vectorized { MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::ostringstream required_fields; std::ostringstream columns_types; std::vector<std::string> column_names; diff --git a/be/src/vec/exec/format/jni_reader.h b/be/src/vec/exec/format/jni_reader.h index d3a0f0da4c0..714bdb96b19 100644 --- a/be/src/vec/exec/format/jni_reader.h +++ b/be/src/vec/exec/format/jni_reader.h @@ -42,13 +42,35 @@ struct TypeDescriptor; namespace doris::vectorized { +class JniReader : public GenericReader { +public: + JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile) + : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {}; + + ~JniReader() override = default; + + Status close() override { + if (_jni_connector) { + return _jni_connector->close(); + } + return Status::OK(); + } + +protected: + const std::vector<SlotDescriptor*>& _file_slot_descs; + RuntimeState* _state = nullptr; + RuntimeProfile* _profile = nullptr; + std::unique_ptr<JniConnector> _jni_connector; +}; + /** * The demo usage of JniReader, showing how to read data from java scanner. * The java side is also a mock reader that provide values for each type. * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class MockJniReader : public GenericReader { +class MockJniReader : public JniReader { public: MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile); @@ -63,6 +85,13 @@ public: Status init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + Status close() override { + if (_jni_connector) { + return _jni_connector->close(); + } + return Status::OK(); + } + protected: void _collect_profile_before_close() override { if (_jni_connector != nullptr) { @@ -71,11 +100,7 @@ protected: } private: - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; - std::unique_ptr<JniConnector> _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 035af267d5b..27f4b808e02 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -865,7 +865,7 @@ Status OrcReader::set_fill_columns( _batch = _row_reader->createRowBatch(_batch_size); auto& selected_type = _row_reader->getSelectedType(); int idx = 0; - static_cast<void>(_init_select_types(selected_type, idx)); + RETURN_IF_ERROR(_init_select_types(selected_type, idx)); _remaining_rows = _row_reader->getNumberOfRows(); @@ -909,7 +909,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) { const orc::Type* sub_type = type.getSubtype(i); _col_orc_type.push_back(sub_type); if (_is_acid && sub_type->getKind() == orc::TypeKind::STRUCT) { - static_cast<void>(_init_select_types(*sub_type, idx)); + RETURN_IF_ERROR(_init_select_types(*sub_type, idx)); } } return Status::OK(); @@ -1530,6 +1530,17 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); + if (_orc_filter) { + RETURN_IF_ERROR(_orc_filter->get_status()); + } + if (_string_dict_filter) { + RETURN_IF_ERROR(_string_dict_filter->get_status()); + } + return Status::OK(); +} + +Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof) { if (_io_ctx && _io_ctx->should_stop) { *eof = true; *read_rows = 0; @@ -1605,7 +1616,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns)); if (block->rows() == 0) { - static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); *eof = true; *read_rows = 0; return Status::OK(); @@ -1617,14 +1628,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, *_filter)); } if (!_not_single_slot_filter_conjuncts.empty()) { - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, column_to_keep))); } else { Block::erase_useless_column(block, column_to_keep); - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } *read_rows = block->rows(); } else { @@ -1697,7 +1708,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns)); if (block->rows() == 0) { - static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); *eof = true; *read_rows = 0; return Status::OK(); @@ -1738,8 +1749,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { std::move(*block->get_by_position(col).column).assume_mutable()->clear(); } Block::erase_useless_column(block, column_to_keep); - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); - return Status::OK(); + return _convert_dict_cols_to_string_cols(block, &batch_vec); } _execute_filter_position_delete_rowids(result_filter); { @@ -1748,14 +1758,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, result_filter)); } if (!_not_single_slot_filter_conjuncts.empty()) { - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, column_to_keep))); } else { Block::erase_useless_column(block, column_to_keep); - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } } else { if (_delete_rows_filter_ptr) { @@ -1771,7 +1781,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, (*filter))); } Block::erase_useless_column(block, column_to_keep); - static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } *read_rows = block->rows(); } @@ -1912,7 +1922,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s block->get_by_name(col.first).column->assume_mutable()->clear(); } Block::erase_useless_column(block, origin_column_num); - static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); } uint16_t new_size = 0; @@ -2148,7 +2158,7 @@ Status OrcReader::on_string_dicts_loaded( } // 4. Rewrite conjuncts. - static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); ++it; } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index ec08d30e185..c790d78123f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -165,6 +165,8 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_next_block_impl(Block* block, size_t* read_rows, bool* eof); + void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result, orc::ColumnVectorBatch* batch, int idx); @@ -228,15 +230,19 @@ private: class ORCFilterImpl : public orc::ORCFilter { public: - ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {} + ORCFilterImpl(OrcReader* orcReader) : _orcReader(orcReader) {} ~ORCFilterImpl() override = default; void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) const override { - static_cast<void>(orcReader->filter(data, sel, size, arg)); + if (_status.ok()) { + _status = _orcReader->filter(data, sel, size, arg); + } } + Status get_status() { return _status; } private: - OrcReader* orcReader = nullptr; + mutable Status _status = Status::OK(); + OrcReader* _orcReader = nullptr; }; class StringDictFilterImpl : public orc::StringDictFilter { @@ -247,17 +253,24 @@ private: virtual void fillDictFilterColumnNames( std::unique_ptr<orc::StripeInformation> current_strip_information, std::list<std::string>& column_names) const override { - static_cast<void>(_orc_reader->fill_dict_filter_column_names( - std::move(current_strip_information), column_names)); + if (_status.ok()) { + _status = _orc_reader->fill_dict_filter_column_names( + std::move(current_strip_information), column_names); + } } virtual void onStringDictsLoaded( std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map, bool* is_stripe_filtered) const override { - static_cast<void>(_orc_reader->on_string_dicts_loaded(column_name_to_dict_map, - is_stripe_filtered)); + if (_status.ok()) { + _status = _orc_reader->on_string_dicts_loaded(column_name_to_dict_map, + is_stripe_filtered); + } } + Status get_status() { return _status; } + private: + mutable Status _status = Status::OK(); OrcReader* _orc_reader = nullptr; }; @@ -597,7 +610,7 @@ private: // std::pair<col_name, slot_id> std::vector<std::pair<std::string, int>> _dict_filter_cols; std::shared_ptr<ObjectPool> _obj_pool; - std::unique_ptr<orc::StringDictFilter> _string_dict_filter; + std::unique_ptr<StringDictFilterImpl> _string_dict_filter; bool _dict_cols_has_converted = false; bool _has_complex_type = false; std::vector<orc::TypeKind>* _unsupported_pushdown_types; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 335207070dd..90b82c52e07 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -883,7 +883,7 @@ Status RowGroupReader::_rewrite_dict_predicates() { } // 4. Rewrite conjuncts. - static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); ++it; } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 0a75b43747a..c3199b5de66 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -781,8 +781,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, auto& conjuncts = conjunct_iter->second; std::vector<int> skipped_page_range; const FieldSchema* col_schema = schema_desc.get_column(read_col); - static_cast<void>(page_index.collect_skipped_page_range( - &column_index, conjuncts, col_schema, skipped_page_range, *_ctz)); + RETURN_IF_ERROR(page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema, + skipped_page_range, *_ctz)); if (skipped_page_range.empty()) { continue; } @@ -790,8 +790,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, RETURN_IF_ERROR(page_index.parse_offset_index(chunk, off_index_buff.data(), &offset_index)); for (int page_id : skipped_page_range) { RowRange skipped_row_range; - static_cast<void>(page_index.create_skipped_row_range(offset_index, row_group.num_rows, - page_id, &skipped_row_range)); + RETURN_IF_ERROR(page_index.create_skipped_row_range(offset_index, row_group.num_rows, + page_id, &skipped_row_range)); // use the union row range skipped_row_ranges.emplace_back(skipped_row_range); } @@ -833,7 +833,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_group, bool* filter_group) { - static_cast<void>(_process_column_stat_filter(row_group.columns, filter_group)); + RETURN_IF_ERROR(_process_column_stat_filter(row_group.columns, filter_group)); _init_chunk_dicts(); RETURN_IF_ERROR(_process_dict_filter(filter_group)); _init_bloom_filter(); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 1cba7da7c65..cffa2ce9ac4 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -43,11 +43,9 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile) - : _scan_params(scan_params), - _hudi_params(hudi_params), - _file_slot_descs(file_slot_descs), - _state(state), - _profile(profile) { + : JniReader(file_slot_descs, state, profile), + _scan_params(scan_params), + _hudi_params(hudi_params) { std::vector<std::string> required_fields; for (auto& desc : _file_slot_descs) { required_fields.emplace_back(desc->col_name()); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index c0438e93289..e9bb55a69a7 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -27,8 +27,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -42,7 +41,7 @@ struct TypeDescriptor; namespace doris::vectorized { -class HudiJniReader : public GenericReader { +class HudiJniReader : public JniReader { ENABLE_FACTORY_CREATOR(HudiJniReader); public: @@ -66,11 +65,7 @@ public: private: const TFileScanRangeParams& _scan_params; const THudiFileDesc& _hudi_params; - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state; - RuntimeProfile* _profile; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; - std::unique_ptr<JniConnector> _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index f7dd9c9846f..d520bd9b295 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -42,11 +42,9 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des const std::vector<SlotDescriptor*>& file_slot_descs, const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile) - : _max_compute_params(max_compute_params), - _file_slot_descs(file_slot_descs), - _range(range), - _state(state), - _profile(profile) { + : JniReader(file_slot_descs, state, profile), + _max_compute_params(max_compute_params), + _range(range) { _table_desc = mc_desc; std::ostringstream required_fields; std::ostringstream columns_types; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h b/be/src/vec/exec/format/table/max_compute_jni_reader.h index e027678148f..9bfef59432d 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.h +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h @@ -28,8 +28,7 @@ #include "common/status.h" #include "exec/olap_common.h" #include "runtime/descriptors.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -49,7 +48,7 @@ namespace doris::vectorized { * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class MaxComputeJniReader : public GenericReader { +class MaxComputeJniReader : public JniReader { ENABLE_FACTORY_CREATOR(MaxComputeJniReader); public: @@ -71,12 +70,8 @@ public: private: const MaxComputeTableDescriptor* _table_desc = nullptr; const TMaxComputeFileDesc& _max_compute_params; - const std::vector<SlotDescriptor*>& _file_slot_descs; const TFileRangeDesc& _range; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr; - std::unique_ptr<JniConnector> _jni_connector = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index 06d24466104..ef690c15b68 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -40,7 +40,7 @@ const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix. PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::vector<std::string> column_names; std::vector<std::string> column_types; for (auto& desc : _file_slot_descs) { diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 162c6ff2cdb..6b6a6907270 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -26,9 +26,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/format/table/table_format_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -48,7 +46,7 @@ namespace doris::vectorized { * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class PaimonJniReader : public GenericReader { +class PaimonJniReader : public JniReader { ENABLE_FACTORY_CREATOR(PaimonJniReader); public: @@ -67,11 +65,7 @@ public: std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); private: - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; - std::unique_ptr<JniConnector> _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 85bfbed0713..a5756e687e9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -135,7 +135,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) { std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> partition_columns; std::unordered_map<std::string, VExprContextSPtr> missing_columns; - static_cast<void>(delete_reader.set_fill_columns(partition_columns, missing_columns)); + RETURN_IF_ERROR(delete_reader.set_fill_columns(partition_columns, missing_columns)); bool eof = false; while (!eof) { diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp index 93e122ae0de..c9b10e716ca 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp @@ -42,7 +42,7 @@ const std::string TrinoConnectorJniReader::TRINO_CONNECTOR_OPTION_PREFIX = TrinoConnectorJniReader::TrinoConnectorJniReader( const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::vector<std::string> column_names; for (const auto& desc : _file_slot_descs) { std::string field = desc->col_name(); diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h b/be/src/vec/exec/format/table/trino_connector_jni_reader.h index 43b2d4fbf9c..de0cf21a881 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h @@ -27,8 +27,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -42,7 +41,7 @@ struct TypeDescriptor; namespace doris::vectorized { -class TrinoConnectorJniReader : public GenericReader { +class TrinoConnectorJniReader : public JniReader { ENABLE_FACTORY_CREATOR(TrinoConnectorJniReader); public: @@ -63,9 +62,5 @@ public: private: Status _set_spi_plugins_dir(); - const std::vector<SlotDescriptor*>& _file_slot_descs; - RuntimeState* _state; - RuntimeProfile* _profile; - std::unique_ptr<JniConnector> _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 5010f1912ab..a9e15f6cac5 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -30,12 +30,6 @@ WalReader::WalReader(RuntimeState* state) : _state(state) { _wal_id = state->wal_id(); } -WalReader::~WalReader() { - if (_wal_reader.get() != nullptr) { - static_cast<void>(_wal_reader->finalize()); - } -} - Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) { _tuple_descriptor = tuple_descriptor; RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index 09311496c16..5834d74efea 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -26,12 +26,19 @@ struct ScannerCounter; class WalReader : public GenericReader { public: WalReader(RuntimeState* state); - ~WalReader() override; + ~WalReader() override = default; Status init_reader(const TupleDescriptor* tuple_descriptor); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; + Status close() override { + if (_wal_reader) { + return _wal_reader->finalize(); + } + return Status::OK(); + } + private: RuntimeState* _state = nullptr; int64_t _wal_id; diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 4b8eb20f227..3df8044f66a 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -65,10 +65,6 @@ namespace doris::vectorized { M(TypeIndex::DateTime, ColumnVector<Int64>, Int64) \ M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64) -JniConnector::~JniConnector() { - static_cast<void>(close()); -} - Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; _profile = profile; diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 22e33f01053..52a3fb2e778 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -208,8 +208,7 @@ public: _is_table_schema = true; } - /// Should release jni resources if other functions are failed. - ~JniConnector(); + ~JniConnector() override = default; /** * Open java scanner, and get the following scanner methods by jni: diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index e7b7a6b96b9..4abf1e239de 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -280,7 +280,7 @@ Status VJoinNodeBase::open(RuntimeState* state) { std::promise<Status> thread_status; try { - static_cast<void>(state->exec_env()->join_node_thread_pool()->submit_func( + RETURN_IF_ERROR(state->exec_env()->join_node_thread_pool()->submit_func( [this, state, thread_status_p = &thread_status] { this->_probe_side_open_thread(state, thread_status_p); })); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index afc2412f2b7..a1c3488fa3a 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -232,7 +232,7 @@ Status NewEsScanner::close(RuntimeState* state) { } if (_es_reader != nullptr) { - static_cast<void>(_es_reader->close()); + RETURN_IF_ERROR(_es_reader->close()); } RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index eba62dcf19a..58ecc4883b1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -103,11 +103,11 @@ Status ScannerScheduler::init(ExecEnv* env) { _remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool"); // 3. limited scan thread pool - static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool") - .set_min_threads(config::doris_scanner_thread_pool_thread_num) - .set_max_threads(config::doris_scanner_thread_pool_thread_num) - .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) - .build(&_limited_scan_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_limited_scan_thread_pool)); _register_metrics(); _is_init = true; return Status::OK(); @@ -246,7 +246,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, scanner->set_opened(); } - static_cast<void>(scanner->try_append_late_arrival_runtime_filter()); + Status rf_status = scanner->try_append_late_arrival_runtime_filter(); + if (!rf_status.ok()) { + LOG(WARNING) << "Failed to append late arrival runtime filter: " << rf_status.to_string(); + } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; size_t raw_bytes_read = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index b3d02860f9a..f194afe4bb0 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -147,11 +147,27 @@ public: return; } if (new_max_thread_num >= cur_max_thread_num) { - static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num)); - static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num)); + Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); + if (!st_max.ok()) { + LOG(WARNING) << "Failed to set max threads for scan thread pool: " + << st_max.to_string(); + } + Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); + if (!st_min.ok()) { + LOG(WARNING) << "Failed to set min threads for scan thread pool: " + << st_min.to_string(); + } } else { - static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num)); - static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num)); + Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); + if (!st_min.ok()) { + LOG(WARNING) << "Failed to set min threads for scan thread pool: " + << st_min.to_string(); + } + Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); + if (!st_max.ok()) { + LOG(WARNING) << "Failed to set max threads for scan thread pool: " + << st_max.to_string(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org