This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 84bef605e02 [refactor](datalake) return the error status instead of 
static_cast<void> (#34873)
84bef605e02 is described below

commit 84bef605e02a57cf9e5a285be9c67ddde8a7ab37
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Wed May 22 14:08:42 2024 +0800

    [refactor](datalake) return the error status instead of static_cast<void> 
(#34873)
    
    Followup #34797
    `static_cast<void>` has ignored the wrong status, some of them should make 
the query finished with error status, so replace `static_cast<void>`  with 
`RETURN_IF_ERROR`.
    
    ### Remaining Works
    The following three scenarios need to be handled separately and cannot be 
simply replaced:
    1. The outer function returns void;
    2. Call status function inner constructors or destructors;
    3. Call status function with best effort, and should ignore the wrong 
status.
---
 be/src/io/fs/buffered_reader.cpp                   |  4 +--
 be/src/io/fs/buffered_reader.h                     |  1 -
 be/src/olap/wal/wal_reader.cpp                     |  7 ++---
 be/src/vec/exec/format/avro/avro_jni_reader.cpp    |  8 ++---
 be/src/vec/exec/format/avro/avro_jni_reader.h      | 11 ++-----
 be/src/vec/exec/format/jni_reader.cpp              |  2 +-
 be/src/vec/exec/format/jni_reader.h                | 35 ++++++++++++++++++---
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 36 ++++++++++++++--------
 be/src/vec/exec/format/orc/vorc_reader.h           | 29 ++++++++++++-----
 .../exec/format/parquet/vparquet_group_reader.cpp  |  2 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 10 +++---
 be/src/vec/exec/format/table/hudi_jni_reader.cpp   |  8 ++---
 be/src/vec/exec/format/table/hudi_jni_reader.h     |  9 ++----
 .../exec/format/table/max_compute_jni_reader.cpp   |  8 ++---
 .../vec/exec/format/table/max_compute_jni_reader.h |  9 ++----
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  2 +-
 be/src/vec/exec/format/table/paimon_jni_reader.h   | 10 ++----
 .../format/table/transactional_hive_reader.cpp     |  2 +-
 .../format/table/trino_connector_jni_reader.cpp    |  2 +-
 .../exec/format/table/trino_connector_jni_reader.h |  9 ++----
 be/src/vec/exec/format/wal/wal_reader.cpp          |  6 ----
 be/src/vec/exec/format/wal/wal_reader.h            |  9 +++++-
 be/src/vec/exec/jni_connector.cpp                  |  4 ---
 be/src/vec/exec/jni_connector.h                    |  3 +-
 be/src/vec/exec/join/vjoin_node_base.cpp           |  2 +-
 be/src/vec/exec/scan/new_es_scanner.cpp            |  2 +-
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 15 +++++----
 be/src/vec/exec/scan/scanner_scheduler.h           | 24 ++++++++++++---
 28 files changed, 146 insertions(+), 123 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 206845e90ce..664997088d9 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -413,8 +413,8 @@ void PrefetchBuffer::reset_offset(size_t offset) {
     } else {
         _exceed = false;
     }
-    
static_cast<void>(ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
-            [buffer_ptr = shared_from_this()]() { 
buffer_ptr->prefetch_buffer(); }));
+    _prefetch_status = 
ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
+            [buffer_ptr = shared_from_this()]() { 
buffer_ptr->prefetch_buffer(); });
 }
 
 // only this function would run concurrently in another thread
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 0195f5d8308..70c8445db23 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -173,7 +173,6 @@ public:
         for (char* box : _boxes) {
             delete[] box;
         }
-        static_cast<void>(close());
     }
 
     Status close() override {
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index fa96f0c5a0b..6e6a530f8db 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -50,11 +50,8 @@ Status WalReader::init() {
 }
 
 Status WalReader::finalize() {
-    if (file_reader != nullptr) {
-        auto st = file_reader->close();
-        if (!st.ok()) {
-            LOG(WARNING) << "fail to close wal " << _file_name << " st= " << 
st.to_string();
-        }
+    if (file_reader) {
+        return file_reader->close();
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp 
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index ec08c58d18a..03135aa5c94 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -29,16 +29,12 @@ AvroJNIReader::AvroJNIReader(RuntimeState* state, 
RuntimeProfile* profile,
                              const TFileScanRangeParams& params,
                              const std::vector<SlotDescriptor*>& 
file_slot_descs,
                              const TFileRangeDesc& range)
-        : _file_slot_descs(file_slot_descs),
-          _state(state),
-          _profile(profile),
-          _params(params),
-          _range(range) {}
+        : JniReader(file_slot_descs, state, profile), _params(params), 
_range(range) {}
 
 AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
                              const TFileRangeDesc& range,
                              const std::vector<SlotDescriptor*>& 
file_slot_descs)
-        : _file_slot_descs(file_slot_descs), _profile(profile), 
_params(params), _range(range) {}
+        : JniReader(file_slot_descs, nullptr, profile), _params(params), 
_range(range) {}
 
 AvroJNIReader::~AvroJNIReader() = default;
 
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h 
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index 64dac0aba4f..82388f32915 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -28,8 +28,7 @@
 
 #include "common/status.h"
 #include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
-#include "vec/exec/jni_connector.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -48,7 +47,7 @@ namespace doris::vectorized {
 /**
  * Read avro-format file
  */
-class AvroJNIReader : public GenericReader {
+class AvroJNIReader : public JniReader {
     ENABLE_FACTORY_CREATOR(AvroJNIReader);
 
 public:
@@ -83,16 +82,10 @@ public:
 
     TypeDescriptor convert_to_doris_type(const rapidjson::Value& 
column_schema);
 
-    TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject 
child_schema);
-
 private:
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
     const TFileScanRangeParams _params;
     const TFileRangeDesc _range;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range = nullptr;
-    std::unique_ptr<JniConnector> _jni_connector;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/jni_reader.cpp 
b/be/src/vec/exec/format/jni_reader.cpp
index 625c79ffff2..563f6cbea51 100644
--- a/be/src/vec/exec/format/jni_reader.cpp
+++ b/be/src/vec/exec/format/jni_reader.cpp
@@ -37,7 +37,7 @@ namespace doris::vectorized {
 
 MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                              RuntimeState* state, RuntimeProfile* profile)
-        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+        : JniReader(file_slot_descs, state, profile) {
     std::ostringstream required_fields;
     std::ostringstream columns_types;
     std::vector<std::string> column_names;
diff --git a/be/src/vec/exec/format/jni_reader.h 
b/be/src/vec/exec/format/jni_reader.h
index d3a0f0da4c0..714bdb96b19 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -42,13 +42,35 @@ struct TypeDescriptor;
 
 namespace doris::vectorized {
 
+class JniReader : public GenericReader {
+public:
+    JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
+              RuntimeProfile* profile)
+            : _file_slot_descs(file_slot_descs), _state(state), 
_profile(profile) {};
+
+    ~JniReader() override = default;
+
+    Status close() override {
+        if (_jni_connector) {
+            return _jni_connector->close();
+        }
+        return Status::OK();
+    }
+
+protected:
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state = nullptr;
+    RuntimeProfile* _profile = nullptr;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
 /**
  * The demo usage of JniReader, showing how to read data from java scanner.
  * The java side is also a mock reader that provide values for each type.
  * This class will only be retained during the functional testing phase to 
verify that
  * the communication and data exchange with the jvm are correct.
  */
-class MockJniReader : public GenericReader {
+class MockJniReader : public JniReader {
 public:
     MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
                   RuntimeProfile* profile);
@@ -63,6 +85,13 @@ public:
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
+    Status close() override {
+        if (_jni_connector) {
+            return _jni_connector->close();
+        }
+        return Status::OK();
+    }
+
 protected:
     void _collect_profile_before_close() override {
         if (_jni_connector != nullptr) {
@@ -71,11 +100,7 @@ protected:
     }
 
 private:
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    std::unique_ptr<JniConnector> _jni_connector;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 035af267d5b..27f4b808e02 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -865,7 +865,7 @@ Status OrcReader::set_fill_columns(
         _batch = _row_reader->createRowBatch(_batch_size);
         auto& selected_type = _row_reader->getSelectedType();
         int idx = 0;
-        static_cast<void>(_init_select_types(selected_type, idx));
+        RETURN_IF_ERROR(_init_select_types(selected_type, idx));
 
         _remaining_rows = _row_reader->getNumberOfRows();
 
@@ -909,7 +909,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, 
int idx) {
         const orc::Type* sub_type = type.getSubtype(i);
         _col_orc_type.push_back(sub_type);
         if (_is_acid && sub_type->getKind() == orc::TypeKind::STRUCT) {
-            static_cast<void>(_init_select_types(*sub_type, idx));
+            RETURN_IF_ERROR(_init_select_types(*sub_type, idx));
         }
     }
     return Status::OK();
@@ -1530,6 +1530,17 @@ std::string OrcReader::get_field_name_lower_case(const 
orc::Type* orc_type, int
 }
 
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof));
+    if (_orc_filter) {
+        RETURN_IF_ERROR(_orc_filter->get_status());
+    }
+    if (_string_dict_filter) {
+        RETURN_IF_ERROR(_string_dict_filter->get_status());
+    }
+    return Status::OK();
+}
+
+Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* 
eof) {
     if (_io_ctx && _io_ctx->should_stop) {
         *eof = true;
         *read_rows = 0;
@@ -1605,7 +1616,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 _fill_missing_columns(block, _batch->numElements, 
_lazy_read_ctx.missing_columns));
 
         if (block->rows() == 0) {
-            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
nullptr));
+            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
             *eof = true;
             *read_rows = 0;
             return Status::OK();
@@ -1617,14 +1628,14 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     Block::filter_block_internal(block, columns_to_filter, 
*_filter));
         }
         if (!_not_single_slot_filter_conjuncts.empty()) {
-            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
+            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
             RETURN_IF_CATCH_EXCEPTION(
                     
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
                             _not_single_slot_filter_conjuncts, nullptr, block, 
columns_to_filter,
                             column_to_keep)));
         } else {
             Block::erase_useless_column(block, column_to_keep);
-            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
+            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
         }
         *read_rows = block->rows();
     } else {
@@ -1697,7 +1708,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 _fill_missing_columns(block, _batch->numElements, 
_lazy_read_ctx.missing_columns));
 
         if (block->rows() == 0) {
-            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
nullptr));
+            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
             *eof = true;
             *read_rows = 0;
             return Status::OK();
@@ -1738,8 +1749,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
                 }
                 Block::erase_useless_column(block, column_to_keep);
-                static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
-                return Status::OK();
+                return _convert_dict_cols_to_string_cols(block, &batch_vec);
             }
             _execute_filter_position_delete_rowids(result_filter);
             {
@@ -1748,14 +1758,14 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                         Block::filter_block_internal(block, columns_to_filter, 
result_filter));
             }
             if (!_not_single_slot_filter_conjuncts.empty()) {
-                static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
+                RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
                 RETURN_IF_CATCH_EXCEPTION(
                         
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
                                 _not_single_slot_filter_conjuncts, nullptr, 
block,
                                 columns_to_filter, column_to_keep)));
             } else {
                 Block::erase_useless_column(block, column_to_keep);
-                static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
+                RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
             }
         } else {
             if (_delete_rows_filter_ptr) {
@@ -1771,7 +1781,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                         Block::filter_block_internal(block, columns_to_filter, 
(*filter)));
             }
             Block::erase_useless_column(block, column_to_keep);
-            static_cast<void>(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
+            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, 
&batch_vec));
         }
         *read_rows = block->rows();
     }
@@ -1912,7 +1922,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
             block->get_by_name(col.first).column->assume_mutable()->clear();
         }
         Block::erase_useless_column(block, origin_column_num);
-        static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr));
+        RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
     }
 
     uint16_t new_size = 0;
@@ -2148,7 +2158,7 @@ Status OrcReader::on_string_dicts_loaded(
         }
 
         // 4. Rewrite conjuncts.
-        static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, 
dict_column->is_nullable()));
+        RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, 
dict_column->is_nullable()));
         ++it;
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index ec08d30e185..c790d78123f 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -165,6 +165,8 @@ public:
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
+    Status get_next_block_impl(Block* block, size_t* read_rows, bool* eof);
+
     void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
                          orc::ColumnVectorBatch* batch, int idx);
 
@@ -228,15 +230,19 @@ private:
 
     class ORCFilterImpl : public orc::ORCFilter {
     public:
-        ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {}
+        ORCFilterImpl(OrcReader* orcReader) : _orcReader(orcReader) {}
         ~ORCFilterImpl() override = default;
         void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
                     void* arg) const override {
-            static_cast<void>(orcReader->filter(data, sel, size, arg));
+            if (_status.ok()) {
+                _status = _orcReader->filter(data, sel, size, arg);
+            }
         }
+        Status get_status() { return _status; }
 
     private:
-        OrcReader* orcReader = nullptr;
+        mutable Status _status = Status::OK();
+        OrcReader* _orcReader = nullptr;
     };
 
     class StringDictFilterImpl : public orc::StringDictFilter {
@@ -247,17 +253,24 @@ private:
         virtual void fillDictFilterColumnNames(
                 std::unique_ptr<orc::StripeInformation> 
current_strip_information,
                 std::list<std::string>& column_names) const override {
-            static_cast<void>(_orc_reader->fill_dict_filter_column_names(
-                    std::move(current_strip_information), column_names));
+            if (_status.ok()) {
+                _status = _orc_reader->fill_dict_filter_column_names(
+                        std::move(current_strip_information), column_names);
+            }
         }
         virtual void onStringDictsLoaded(
                 std::unordered_map<std::string, orc::StringDictionary*>& 
column_name_to_dict_map,
                 bool* is_stripe_filtered) const override {
-            
static_cast<void>(_orc_reader->on_string_dicts_loaded(column_name_to_dict_map,
-                                                                  
is_stripe_filtered));
+            if (_status.ok()) {
+                _status = 
_orc_reader->on_string_dicts_loaded(column_name_to_dict_map,
+                                                              
is_stripe_filtered);
+            }
         }
 
+        Status get_status() { return _status; }
+
     private:
+        mutable Status _status = Status::OK();
         OrcReader* _orc_reader = nullptr;
     };
 
@@ -597,7 +610,7 @@ private:
     // std::pair<col_name, slot_id>
     std::vector<std::pair<std::string, int>> _dict_filter_cols;
     std::shared_ptr<ObjectPool> _obj_pool;
-    std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
+    std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
     bool _dict_cols_has_converted = false;
     bool _has_complex_type = false;
     std::vector<orc::TypeKind>* _unsupported_pushdown_types;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 335207070dd..90b82c52e07 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -883,7 +883,7 @@ Status RowGroupReader::_rewrite_dict_predicates() {
         }
 
         // 4. Rewrite conjuncts.
-        static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, 
dict_column->is_nullable()));
+        RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, 
dict_column->is_nullable()));
         ++it;
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 0a75b43747a..c3199b5de66 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -781,8 +781,8 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         auto& conjuncts = conjunct_iter->second;
         std::vector<int> skipped_page_range;
         const FieldSchema* col_schema = schema_desc.get_column(read_col);
-        static_cast<void>(page_index.collect_skipped_page_range(
-                &column_index, conjuncts, col_schema, skipped_page_range, 
*_ctz));
+        RETURN_IF_ERROR(page_index.collect_skipped_page_range(&column_index, 
conjuncts, col_schema,
+                                                              
skipped_page_range, *_ctz));
         if (skipped_page_range.empty()) {
             continue;
         }
@@ -790,8 +790,8 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         RETURN_IF_ERROR(page_index.parse_offset_index(chunk, 
off_index_buff.data(), &offset_index));
         for (int page_id : skipped_page_range) {
             RowRange skipped_row_range;
-            
static_cast<void>(page_index.create_skipped_row_range(offset_index, 
row_group.num_rows,
-                                                                  page_id, 
&skipped_row_range));
+            RETURN_IF_ERROR(page_index.create_skipped_row_range(offset_index, 
row_group.num_rows,
+                                                                page_id, 
&skipped_row_range));
             // use the union row range
             skipped_row_ranges.emplace_back(skipped_row_range);
         }
@@ -833,7 +833,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
 
 Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& 
row_group,
                                                 bool* filter_group) {
-    static_cast<void>(_process_column_stat_filter(row_group.columns, 
filter_group));
+    RETURN_IF_ERROR(_process_column_stat_filter(row_group.columns, 
filter_group));
     _init_chunk_dicts();
     RETURN_IF_ERROR(_process_dict_filter(filter_group));
     _init_bloom_filter();
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp 
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index 1cba7da7c65..cffa2ce9ac4 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -43,11 +43,9 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
                              const THudiFileDesc& hudi_params,
                              const std::vector<SlotDescriptor*>& 
file_slot_descs,
                              RuntimeState* state, RuntimeProfile* profile)
-        : _scan_params(scan_params),
-          _hudi_params(hudi_params),
-          _file_slot_descs(file_slot_descs),
-          _state(state),
-          _profile(profile) {
+        : JniReader(file_slot_descs, state, profile),
+          _scan_params(scan_params),
+          _hudi_params(hudi_params) {
     std::vector<std::string> required_fields;
     for (auto& desc : _file_slot_descs) {
         required_fields.emplace_back(desc->col_name());
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h 
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index c0438e93289..e9bb55a69a7 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -27,8 +27,7 @@
 
 #include "common/status.h"
 #include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
-#include "vec/exec/jni_connector.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -42,7 +41,7 @@ struct TypeDescriptor;
 
 namespace doris::vectorized {
 
-class HudiJniReader : public GenericReader {
+class HudiJniReader : public JniReader {
     ENABLE_FACTORY_CREATOR(HudiJniReader);
 
 public:
@@ -66,11 +65,7 @@ public:
 private:
     const TFileScanRangeParams& _scan_params;
     const THudiFileDesc& _hudi_params;
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state;
-    RuntimeProfile* _profile;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    std::unique_ptr<JniConnector> _jni_connector;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp 
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index f7dd9c9846f..d520bd9b295 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -42,11 +42,9 @@ MaxComputeJniReader::MaxComputeJniReader(const 
MaxComputeTableDescriptor* mc_des
                                          const std::vector<SlotDescriptor*>& 
file_slot_descs,
                                          const TFileRangeDesc& range, 
RuntimeState* state,
                                          RuntimeProfile* profile)
-        : _max_compute_params(max_compute_params),
-          _file_slot_descs(file_slot_descs),
-          _range(range),
-          _state(state),
-          _profile(profile) {
+        : JniReader(file_slot_descs, state, profile),
+          _max_compute_params(max_compute_params),
+          _range(range) {
     _table_desc = mc_desc;
     std::ostringstream required_fields;
     std::ostringstream columns_types;
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h 
b/be/src/vec/exec/format/table/max_compute_jni_reader.h
index e027678148f..9bfef59432d 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.h
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h
@@ -28,8 +28,7 @@
 #include "common/status.h"
 #include "exec/olap_common.h"
 #include "runtime/descriptors.h"
-#include "vec/exec/format/generic_reader.h"
-#include "vec/exec/jni_connector.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -49,7 +48,7 @@ namespace doris::vectorized {
  * This class will only be retained during the functional testing phase to 
verify that
  * the communication and data exchange with the jvm are correct.
  */
-class MaxComputeJniReader : public GenericReader {
+class MaxComputeJniReader : public JniReader {
     ENABLE_FACTORY_CREATOR(MaxComputeJniReader);
 
 public:
@@ -71,12 +70,8 @@ public:
 private:
     const MaxComputeTableDescriptor* _table_desc = nullptr;
     const TMaxComputeFileDesc& _max_compute_params;
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
     const TFileRangeDesc& _range;
-    RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range = nullptr;
-    std::unique_ptr<JniConnector> _jni_connector = nullptr;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 06d24466104..ef690c15b68 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -40,7 +40,7 @@ const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = 
"paimon_option_prefix.
 PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                                  RuntimeState* state, RuntimeProfile* profile,
                                  const TFileRangeDesc& range)
-        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+        : JniReader(file_slot_descs, state, profile) {
     std::vector<std::string> column_names;
     std::vector<std::string> column_types;
     for (auto& desc : _file_slot_descs) {
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 162c6ff2cdb..6b6a6907270 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -26,9 +26,7 @@
 
 #include "common/status.h"
 #include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
-#include "vec/exec/format/table/table_format_reader.h"
-#include "vec/exec/jni_connector.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -48,7 +46,7 @@ namespace doris::vectorized {
  * This class will only be retained during the functional testing phase to 
verify that
  * the communication and data exchange with the jvm are correct.
  */
-class PaimonJniReader : public GenericReader {
+class PaimonJniReader : public JniReader {
     ENABLE_FACTORY_CREATOR(PaimonJniReader);
 
 public:
@@ -67,11 +65,7 @@ public:
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
 private:
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    std::unique_ptr<JniConnector> _jni_connector;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 85bfbed0713..a5756e687e9 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -135,7 +135,7 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range) {
         std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
                 partition_columns;
         std::unordered_map<std::string, VExprContextSPtr> missing_columns;
-        static_cast<void>(delete_reader.set_fill_columns(partition_columns, 
missing_columns));
+        RETURN_IF_ERROR(delete_reader.set_fill_columns(partition_columns, 
missing_columns));
 
         bool eof = false;
         while (!eof) {
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp 
b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
index 93e122ae0de..c9b10e716ca 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
@@ -42,7 +42,7 @@ const std::string 
TrinoConnectorJniReader::TRINO_CONNECTOR_OPTION_PREFIX =
 TrinoConnectorJniReader::TrinoConnectorJniReader(
         const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state,
         RuntimeProfile* profile, const TFileRangeDesc& range)
-        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+        : JniReader(file_slot_descs, state, profile) {
     std::vector<std::string> column_names;
     for (const auto& desc : _file_slot_descs) {
         std::string field = desc->col_name();
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h 
b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
index 43b2d4fbf9c..de0cf21a881 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
@@ -27,8 +27,7 @@
 
 #include "common/status.h"
 #include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
-#include "vec/exec/jni_connector.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -42,7 +41,7 @@ struct TypeDescriptor;
 
 namespace doris::vectorized {
 
-class TrinoConnectorJniReader : public GenericReader {
+class TrinoConnectorJniReader : public JniReader {
     ENABLE_FACTORY_CREATOR(TrinoConnectorJniReader);
 
 public:
@@ -63,9 +62,5 @@ public:
 
 private:
     Status _set_spi_plugins_dir();
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state;
-    RuntimeProfile* _profile;
-    std::unique_ptr<JniConnector> _jni_connector;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 5010f1912ab..a9e15f6cac5 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -30,12 +30,6 @@ WalReader::WalReader(RuntimeState* state) : _state(state) {
     _wal_id = state->wal_id();
 }
 
-WalReader::~WalReader() {
-    if (_wal_reader.get() != nullptr) {
-        static_cast<void>(_wal_reader->finalize());
-    }
-}
-
 Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
     _tuple_descriptor = tuple_descriptor;
     RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, 
_wal_path));
diff --git a/be/src/vec/exec/format/wal/wal_reader.h 
b/be/src/vec/exec/format/wal/wal_reader.h
index 09311496c16..5834d74efea 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -26,12 +26,19 @@ struct ScannerCounter;
 class WalReader : public GenericReader {
 public:
     WalReader(RuntimeState* state);
-    ~WalReader() override;
+    ~WalReader() override = default;
     Status init_reader(const TupleDescriptor* tuple_descriptor);
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status close() override {
+        if (_wal_reader) {
+            return _wal_reader->finalize();
+        }
+        return Status::OK();
+    }
+
 private:
     RuntimeState* _state = nullptr;
     int64_t _wal_id;
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 4b8eb20f227..3df8044f66a 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -65,10 +65,6 @@ namespace doris::vectorized {
     M(TypeIndex::DateTime, ColumnVector<Int64>, Int64)                 \
     M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64)
 
-JniConnector::~JniConnector() {
-    static_cast<void>(close());
-}
-
 Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
     _state = state;
     _profile = profile;
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 22e33f01053..52a3fb2e778 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -208,8 +208,7 @@ public:
         _is_table_schema = true;
     }
 
-    /// Should release jni resources if other functions are failed.
-    ~JniConnector();
+    ~JniConnector() override = default;
 
     /**
      * Open java scanner, and get the following scanner methods by jni:
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index e7b7a6b96b9..4abf1e239de 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -280,7 +280,7 @@ Status VJoinNodeBase::open(RuntimeState* state) {
 
     std::promise<Status> thread_status;
     try {
-        
static_cast<void>(state->exec_env()->join_node_thread_pool()->submit_func(
+        
RETURN_IF_ERROR(state->exec_env()->join_node_thread_pool()->submit_func(
                 [this, state, thread_status_p = &thread_status] {
                     this->_probe_side_open_thread(state, thread_status_p);
                 }));
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp 
b/be/src/vec/exec/scan/new_es_scanner.cpp
index afc2412f2b7..a1c3488fa3a 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -232,7 +232,7 @@ Status NewEsScanner::close(RuntimeState* state) {
     }
 
     if (_es_reader != nullptr) {
-        static_cast<void>(_es_reader->close());
+        RETURN_IF_ERROR(_es_reader->close());
     }
 
     RETURN_IF_ERROR(VScanner::close(state));
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index eba62dcf19a..58ecc4883b1 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -103,11 +103,11 @@ Status ScannerScheduler::init(ExecEnv* env) {
             _remote_thread_pool_max_size, remote_scan_pool_queue_size, 
"RemoteScanThreadPool");
 
     // 3. limited scan thread pool
-    static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
-                              
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-                              
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
-                              
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
-                              .build(&_limited_scan_thread_pool));
+    RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
+                            
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
+                            
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
+                            
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
+                            .build(&_limited_scan_thread_pool));
     _register_metrics();
     _is_init = true;
     return Status::OK();
@@ -246,7 +246,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
         scanner->set_opened();
     }
 
-    static_cast<void>(scanner->try_append_late_arrival_runtime_filter());
+    Status rf_status = scanner->try_append_late_arrival_runtime_filter();
+    if (!rf_status.ok()) {
+        LOG(WARNING) << "Failed to append late arrival runtime filter: " << 
rf_status.to_string();
+    }
 
     size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     size_t raw_bytes_read = 0;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index b3d02860f9a..f194afe4bb0 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -147,11 +147,27 @@ public:
             return;
         }
         if (new_max_thread_num >= cur_max_thread_num) {
-            
static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num));
-            
static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num));
+            Status st_max = 
_scan_thread_pool->set_max_threads(new_max_thread_num);
+            if (!st_max.ok()) {
+                LOG(WARNING) << "Failed to set max threads for scan thread 
pool: "
+                             << st_max.to_string();
+            }
+            Status st_min = 
_scan_thread_pool->set_min_threads(new_min_thread_num);
+            if (!st_min.ok()) {
+                LOG(WARNING) << "Failed to set min threads for scan thread 
pool: "
+                             << st_min.to_string();
+            }
         } else {
-            
static_cast<void>(_scan_thread_pool->set_min_threads(new_min_thread_num));
-            
static_cast<void>(_scan_thread_pool->set_max_threads(new_max_thread_num));
+            Status st_min = 
_scan_thread_pool->set_min_threads(new_min_thread_num);
+            if (!st_min.ok()) {
+                LOG(WARNING) << "Failed to set min threads for scan thread 
pool: "
+                             << st_min.to_string();
+            }
+            Status st_max = 
_scan_thread_pool->set_max_threads(new_max_thread_num);
+            if (!st_max.ok()) {
+                LOG(WARNING) << "Failed to set max threads for scan thread 
pool: "
+                             << st_max.to_string();
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to