This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ad989065 [refactor](reader) refactor the interface of file reader 
(#12574)
c5ad989065 is described below

commit c5ad9890656b452e80ad3fef3b7aef228c8ce2b7
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Sep 14 22:31:11 2022 +0800

    [refactor](reader) refactor the interface of file reader (#12574)
    
    Currently, Doris has a variety of readers for different file formats,
    such as parquet reader, orc reader, csv reader, json reader and so on.
    
    The interfaces of these readers are not unified, which makes it impossible 
to call them through a unified method.
    
    In this PR, I added a `GenericReader` interface class, and other Readers 
will implement this interface class
    to use the `get_next_block()` method.
    
    This PR currently only modifies `arrow_reader` and `parquet reader`.
    Other readers will be modified one by one in subsequent PRs.
---
 be/src/exec/arrow/arrow_reader.cpp                 | 54 ++++++++++++++++++----
 be/src/exec/arrow/arrow_reader.h                   | 27 +++++++----
 be/src/exec/arrow/orc_reader.cpp                   | 18 ++++----
 be/src/exec/arrow/orc_reader.h                     |  4 +-
 be/src/exec/arrow/parquet_reader.cpp               | 19 ++++----
 be/src/exec/arrow/parquet_reader.h                 |  7 ++-
 be/src/exec/parquet_scanner.cpp                    |  9 ++--
 be/src/vec/CMakeLists.txt                          |  2 +-
 be/src/vec/exec/file_arrow_scanner.cpp             | 28 ++++++-----
 be/src/vec/exec/file_arrow_scanner.h               | 15 +++---
 be/src/vec/exec/file_hdfs_scanner.cpp              |  4 +-
 be/src/vec/exec/format/generic_reader.h            | 33 +++++++++++++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  4 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  7 +--
 be/src/vec/exec/scan/new_file_arrow_scanner.cpp    | 29 ++++++------
 be/src/vec/exec/scan/new_file_arrow_scanner.h      | 15 +++---
 be/src/vec/exec/varrow_scanner.cpp                 | 10 ++--
 be/src/vec/exec/varrow_scanner.h                   |  3 +-
 be/src/vec/exec/vorc_scanner.cpp                   |  7 +--
 be/src/vec/exec/vorc_scanner.h                     |  6 +--
 be/src/vec/exec/vparquet_scanner.cpp               |  9 ++--
 be/src/vec/exec/vparquet_scanner.h                 |  6 +--
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |  4 +-
 23 files changed, 199 insertions(+), 121 deletions(-)

diff --git a/be/src/exec/arrow/arrow_reader.cpp 
b/be/src/exec/arrow/arrow_reader.cpp
index 83f3e191f6..d26efd32aa 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -29,17 +29,21 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/mem_pool.h"
+#include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "util/string_util.h"
 #include "util/thrift_util.h"
+#include "vec/core/block.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
 
 namespace doris {
 
-// Broker
-
-ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
-                                 int32_t num_of_columns_from_file, bool 
case_sensitive)
-        : _batch_size(batch_size),
+ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state,
+                                 const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                 FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                 bool case_sensitive)
+        : _state(state),
+          _file_slot_descs(file_slot_descs),
           _num_of_columns_from_file(num_of_columns_from_file),
           _case_sensitive(case_sensitive) {
     _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
@@ -65,11 +69,11 @@ void ArrowReaderWrap::close() {
     }
 }
 
-Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& 
tuple_slot_descs) {
-    DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size());
+Status ArrowReaderWrap::column_indices() {
+    DCHECK(_num_of_columns_from_file <= _file_slot_descs.size());
     _include_column_ids.clear();
     for (int i = 0; i < _num_of_columns_from_file; i++) {
-        auto slot_desc = tuple_slot_descs.at(i);
+        auto slot_desc = _file_slot_descs.at(i);
         // Get the Column Reader for the boolean column
         auto iter = _map_column.find(slot_desc->col_name());
         if (iter != _map_column.end()) {
@@ -84,7 +88,7 @@ Status ArrowReaderWrap::column_indices(const 
std::vector<SlotDescriptor*>& tuple
     return Status::OK();
 }
 
-int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
+int ArrowReaderWrap::get_column_index(std::string column_name) {
     std::string real_column_name = _case_sensitive ? column_name : 
to_lower(column_name);
     auto iter = _map_column.find(real_column_name);
     if (iter != _map_column.end()) {
@@ -97,6 +101,37 @@ int ArrowReaderWrap::get_cloumn_index(std::string 
column_name) {
     }
 }
 
+Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
+    size_t rows = 0;
+    do {
+        if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+            RETURN_IF_ERROR(next_batch(&_batch, eof));
+            if (*eof) {
+                return Status::OK();
+            }
+        }
+
+        size_t num_elements = std::min<size_t>((_state->batch_size() - 
block->rows()),
+                                               (_batch->num_rows() - 
_arrow_batch_cur_idx));
+        for (auto i = 0; i < _file_slot_descs.size(); ++i) {
+            SlotDescriptor* slot_desc = _file_slot_descs[i];
+            if (slot_desc == nullptr) {
+                continue;
+            }
+            std::string real_column_name =
+                    is_case_sensitive() ? slot_desc->col_name() : 
slot_desc->col_name_lower_case();
+            auto* array = _batch->GetColumnByName(real_column_name).get();
+            auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
+            RETURN_IF_ERROR(arrow_column_to_doris_column(
+                    array, _arrow_batch_cur_idx, 
column_with_type_and_name.column,
+                    column_with_type_and_name.type, num_elements, 
_state->timezone_obj()));
+        }
+        rows += num_elements;
+        _arrow_batch_cur_idx += num_elements;
+    } while (!(*eof) && rows < _state->batch_size());
+    return Status::OK();
+}
+
 Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, 
bool* eof) {
     std::unique_lock<std::mutex> lock(_mtx);
     while (!_closed && _queue.empty()) {
@@ -114,6 +149,7 @@ Status 
ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, b
     *batch = _queue.front();
     _queue.pop_front();
     _queue_writer_cond.notify_one();
+    _arrow_batch_cur_idx = 0;
     return Status::OK();
 }
 
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 24c381316e..35703e4bbd 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -37,6 +37,7 @@
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
+#include "vec/exec/format/generic_reader.h"
 
 namespace doris {
 
@@ -77,38 +78,42 @@ private:
 };
 
 // base of arrow reader
-class ArrowReaderWrap {
+class ArrowReaderWrap : public vectorized::GenericReader {
 public:
-    ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
-                    bool caseSensitive);
+    ArrowReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                    FileReader* file_reader, int32_t num_of_columns_from_file, 
bool caseSensitive);
     virtual ~ArrowReaderWrap();
 
     virtual Status init_reader(const TupleDescriptor* tuple_desc,
-                               const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
                                const std::vector<ExprContext*>& conjunct_ctxs,
                                const std::string& timezone) = 0;
     // for row
-    virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
-                        MemPool* mem_pool, bool* eof) {
+    virtual Status read(Tuple* tuple, MemPool* mem_pool, bool* eof) {
         return Status::NotSupported("Not Implemented read");
     }
     // for vec
+    Status get_next_block(vectorized::Block* block, bool* eof) override;
+    // This method should be deprecated once the old scanner is removed.
+    // And user should use "get_next_block" instead.
     Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
+
     std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
     virtual Status size(int64_t* size) { return Status::NotSupported("Not 
Implemented size"); }
-    int get_cloumn_index(std::string column_name);
+    int get_column_index(std::string column_name);
 
     void prefetch_batch();
     bool is_case_sensitive() { return _case_sensitive; }
 
 protected:
-    virtual Status column_indices(const std::vector<SlotDescriptor*>& 
tuple_slot_descs);
+    virtual Status column_indices();
     virtual void read_batches(arrow::RecordBatchVector& batches, int 
current_group) = 0;
     virtual bool filter_row_group(int current_group) = 0;
 
 protected:
-    const int64_t _batch_size;
+    RuntimeState* _state;
+    std::vector<SlotDescriptor*> _file_slot_descs;
+
     const int32_t _num_of_columns_from_file;
     std::shared_ptr<ArrowFile> _arrow_file;
     std::shared_ptr<::arrow::RecordBatchReader> _rb_reader;
@@ -128,6 +133,10 @@ protected:
     const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
     std::thread _thread;
     bool _case_sensitive;
+
+    // The following fields are only valid when using "get_block()" interface.
+    std::shared_ptr<arrow::RecordBatch> _batch;
+    size_t _arrow_batch_cur_idx = 0;
 };
 
 } // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 3223c64b6a..65a67909ba 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -23,15 +23,18 @@
 #include "common/logging.h"
 #include "io/file_reader.h"
 #include "runtime/mem_pool.h"
+#include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "util/string_util.h"
 
 namespace doris {
 
-ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
-                             int32_t num_of_columns_from_file, int64_t 
range_start_offset,
-                             int64_t range_size, bool case_sensitive)
-        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, 
case_sensitive),
+ORCReaderWrap::ORCReaderWrap(RuntimeState* state,
+                             const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                             int64_t range_start_offset, int64_t range_size, 
bool case_sensitive)
+        : ArrowReaderWrap(state, file_slot_descs, file_reader, 
num_of_columns_from_file,
+                          case_sensitive),
           _range_start_offset(range_start_offset),
           _range_size(range_size) {
     _reader = nullptr;
@@ -39,7 +42,6 @@ ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t 
batch_size,
 }
 
 Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
-                                  const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
                                   const std::vector<ExprContext*>& 
conjunct_ctxs,
                                   const std::string& timezone) {
     // Open ORC file reader
@@ -73,7 +75,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
 
         _map_column.emplace(schemaName, i + 1);
     }
-    RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+    RETURN_IF_ERROR(column_indices());
 
     _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
 
@@ -133,7 +135,7 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
     // which may cause OOM issues by loading the whole stripe into memory.
     // Note this will only read rows for the current stripe, not the entire 
file.
     arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> maybe_rb_reader =
-            _reader->NextStripeReader(_batch_size, _include_column_ids);
+            _reader->NextStripeReader(_state->batch_size(), 
_include_column_ids);
     if (!maybe_rb_reader.ok()) {
         LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status();
         return Status::InternalError(maybe_rb_reader.status().ToString());
@@ -162,4 +164,4 @@ bool ORCReaderWrap::filter_row_group(int current_group) {
     return false;
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index c166196d83..a6455e8400 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -32,12 +32,12 @@ namespace doris {
 // Reader of ORC file
 class ORCReaderWrap final : public ArrowReaderWrap {
 public:
-    ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
+    ORCReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                  FileReader* file_reader, int32_t num_of_columns_from_file,
                   int64_t range_start_offset, int64_t range_size, bool 
case_sensitive = true);
     ~ORCReaderWrap() override = default;
 
     Status init_reader(const TupleDescriptor* tuple_desc,
-                       const std::vector<SlotDescriptor*>& tuple_slot_descs,
                        const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
 
diff --git a/be/src/exec/arrow/parquet_reader.cpp 
b/be/src/exec/arrow/parquet_reader.cpp
index 97b033b4f7..fc8c1ca5f5 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -37,10 +37,13 @@
 namespace doris {
 
 // Broker
-ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t 
batch_size,
-                                     int32_t num_of_columns_from_file, int64_t 
range_start_offset,
-                                     int64_t range_size, bool case_sensitive)
-        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, 
case_sensitive),
+ParquetReaderWrap::ParquetReaderWrap(RuntimeState* state,
+                                     const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                     FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                     int64_t range_start_offset, int64_t 
range_size,
+                                     bool case_sensitive)
+        : ArrowReaderWrap(state, file_slot_descs, file_reader, 
num_of_columns_from_file,
+                          case_sensitive),
           _rows_of_group(0),
           _current_line_of_group(0),
           _current_line_of_batch(0),
@@ -48,7 +51,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, 
int64_t batch_size
           _range_size(range_size) {}
 
 Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
-                                      const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
                                       const std::vector<ExprContext*>& 
conjunct_ctxs,
                                       const std::string& timezone) {
     try {
@@ -97,7 +99,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* 
tuple_desc,
 
         _timezone = timezone;
 
-        RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+        RETURN_IF_ERROR(column_indices());
         if (config::parquet_predicate_push_down) {
             int64_t file_size = 0;
             size(&file_size);
@@ -238,8 +240,7 @@ Status ParquetReaderWrap::init_parquet_type() {
     return Status::OK();
 }
 
-Status ParquetReaderWrap::read(Tuple* tuple, const 
std::vector<SlotDescriptor*>& tuple_slot_descs,
-                               MemPool* mem_pool, bool* eof) {
+Status ParquetReaderWrap::read(Tuple* tuple, MemPool* mem_pool, bool* eof) {
     if (_batch == nullptr) {
         _current_line_of_group += _rows_of_group;
         return read_record_batch(eof);
@@ -251,7 +252,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const 
std::vector<SlotDescriptor*>&
     try {
         size_t slots = _include_column_ids.size();
         for (size_t i = 0; i < slots; ++i) {
-            auto slot_desc = tuple_slot_descs[i];
+            auto slot_desc = _file_slot_descs[i];
             column_index = i; // column index in batch record
             switch (_parquet_column_type[i]) {
             case arrow::Type::type::STRING: {
diff --git a/be/src/exec/arrow/parquet_reader.h 
b/be/src/exec/arrow/parquet_reader.h
index d5d0165665..a2a80a1966 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -62,16 +62,15 @@ class RowGroupReader;
 class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
     // batch_size is not use here
-    ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t 
num_of_columns_from_file,
+    ParquetReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                      FileReader* file_reader, int32_t 
num_of_columns_from_file,
                       int64_t range_start_offset, int64_t range_size, bool 
case_sensitive = true);
     ~ParquetReaderWrap() override = default;
 
     // Read
-    Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
-                MemPool* mem_pool, bool* eof) override;
+    Status read(Tuple* tuple, MemPool* mem_pool, bool* eof) override;
     Status size(int64_t* size) override;
     Status init_reader(const TupleDescriptor* tuple_desc,
-                       const std::vector<SlotDescriptor*>& tuple_slot_descs,
                        const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
     Status init_parquet_type();
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index d7aa4041e2..e6da51f71d 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -55,8 +55,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* 
tuple_pool, bool* eof, bo
             }
             _cur_file_eof = false;
         }
-        RETURN_IF_ERROR(
-                _cur_file_reader->read(_src_tuple, _src_slot_descs, 
tuple_pool, &_cur_file_eof));
+        RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, tuple_pool, 
&_cur_file_eof));
         // range of current file
         const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
         if (range.__isset.num_of_columns_from_file) {
@@ -106,11 +105,11 @@ Status ParquetScanner::open_next_reader() {
         if (range.__isset.num_of_columns_from_file) {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
-        _cur_file_reader = new ParquetReaderWrap(file_reader.release(), 
_state->batch_size(),
+        _cur_file_reader = new ParquetReaderWrap(_state, _src_slot_descs, 
file_reader.release(),
                                                  num_of_columns_from_file, 0, 
0);
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-        Status status = _cur_file_reader->init_reader(tuple_desc, 
_src_slot_descs, _conjunct_ctxs,
-                                                      _state->timezone());
+        Status status =
+                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, 
_state->timezone());
         if (status.is_end_of_file()) {
             continue;
         } else {
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index d8fc97e6b8..36cadb284f 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -244,7 +244,7 @@ set(VEC_FILES
   exec/scan/new_file_scan_node.cpp
   exec/scan/new_file_scanner.cpp
   exec/scan/new_file_text_scanner.cpp
-)
+  )
 
 add_library(Vec STATIC
         ${VEC_FILES}
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp 
b/be/src/vec/exec/file_arrow_scanner.cpp
index 998d40ea6f..62d75a4e14 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -71,12 +71,12 @@ Status FileArrowScanner::_open_next_reader() {
         int32_t num_of_columns_from_file = _file_slot_descs.size();
 
         _cur_file_reader =
-                _new_arrow_reader(file_reader.release(), _state->batch_size(),
-                                  num_of_columns_from_file, 
range.start_offset, range.size);
+                _new_arrow_reader(_file_slot_descs, file_reader.release(), 
num_of_columns_from_file,
+                                  range.start_offset, range.size);
 
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-        Status status = _cur_file_reader->init_reader(tuple_desc, 
_file_slot_descs, _conjunct_ctxs,
-                                                      _state->timezone());
+        Status status =
+                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, 
_state->timezone());
         if (status.is_end_of_file()) {
             continue;
         } else {
@@ -226,11 +226,10 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* 
state, RuntimeProfile* pr
     _init_profiles(profile);
 }
 
-ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* 
file_reader, int64_t batch_size,
-                                                        int32_t 
num_of_columns_from_file,
-                                                        int64_t 
range_start_offset,
-                                                        int64_t range_size) {
-    return new ParquetReaderWrap(file_reader, batch_size, 
num_of_columns_from_file,
+ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
+    return new ParquetReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
                                  range_start_offset, range_size, false);
 }
 
@@ -250,12 +249,11 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, 
RuntimeProfile* profile,
                                  ScannerCounter* counter)
         : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {}
 
-ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
-                                                    int32_t 
num_of_columns_from_file,
-                                                    int64_t range_start_offset,
-                                                    int64_t range_size) {
-    return new ORCReaderWrap(file_reader, batch_size, 
num_of_columns_from_file, range_start_offset,
-                             range_size, false);
+ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
+    return new ORCReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
+                             range_start_offset, range_size, false);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_arrow_scanner.h 
b/be/src/vec/exec/file_arrow_scanner.h
index 93e391af92..113bd54d6e 100644
--- a/be/src/vec/exec/file_arrow_scanner.h
+++ b/be/src/vec/exec/file_arrow_scanner.h
@@ -52,7 +52,8 @@ public:
     void close() override;
 
 protected:
-    virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
+    virtual ArrowReaderWrap* _new_arrow_reader(const 
std::vector<SlotDescriptor*>& file_slot_descs,
+                                               FileReader* file_reader,
                                                int32_t 
num_of_columns_from_file,
                                                int64_t range_start_offset, 
int64_t range_size) = 0;
     virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
@@ -83,9 +84,9 @@ public:
     ~VFileParquetScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
 
     void _init_profiles(RuntimeProfile* profile) override;
     void _update_profile(std::shared_ptr<Statistics>& statistics) override;
@@ -108,9 +109,9 @@ public:
     ~VFileORCScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
     void _init_profiles(RuntimeProfile* profile) override {};
 };
 
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp 
b/be/src/vec/exec/file_hdfs_scanner.cpp
index 6ba8fe1b27..8484b8d754 100644
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ b/be/src/vec/exec/file_hdfs_scanner.cpp
@@ -50,7 +50,7 @@ Status ParquetFileHdfsScanner::get_next(vectorized::Block* 
block, bool* eof) {
     }
     RETURN_IF_ERROR(init_block(block));
     bool range_eof = false;
-    RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
+    RETURN_IF_ERROR(_reader->get_next_block(block, &range_eof));
     if (block->rows() > 0) {
         _fill_columns_from_path(block, block->rows());
     }
@@ -89,4 +89,4 @@ void ParquetFileHdfsScanner::close() {
     FileScanner::close();
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
new file mode 100644
index 0000000000..d830a8c0b8
--- /dev/null
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+
+namespace doris::vectorized {
+
+class Block;
+// This a reader interface for all file readers.
+// A GenericReader is responsible for reading a file and return
+// a set of blocks with specified schema,
+class GenericReader {
+public:
+    virtual Status get_next_block(Block* block, bool* eof) = 0;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index bda676e1c3..1319755355 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -99,7 +99,7 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
     return Status::OK();
 }
 
-Status ParquetReader::read_next_batch(Block* block, bool* eof) {
+Status ParquetReader::get_next_block(Block* block, bool* eof) {
     int32_t num_of_readers = _row_group_readers.size();
     DCHECK(num_of_readers <= _read_row_groups.size());
     if (_read_row_groups.empty()) {
@@ -322,4 +322,4 @@ int64_t ParquetReader::_get_column_start_offset(const 
tparquet::ColumnMetaData&
     }
     return column.data_page_offset;
 }
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9facffa623..95ffa10bd6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -28,6 +28,7 @@
 #include "gen_cpp/parquet_types.h"
 #include "io/file_reader.h"
 #include "vec/core/block.h"
+#include "vec/exec/format/generic_reader.h"
 #include "vparquet_file_metadata.h"
 #include "vparquet_group_reader.h"
 #include "vparquet_page_index.h"
@@ -63,18 +64,18 @@ private:
     //    int64_t chunk_size;
 };
 
-class ParquetReader {
+class ParquetReader : public GenericReader {
 public:
     ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, 
size_t batch_size,
                   int64_t range_start_offset, int64_t range_size, 
cctz::time_zone* ctz);
 
-    ~ParquetReader();
+    virtual ~ParquetReader();
 
     Status init_reader(const TupleDescriptor* tuple_desc,
                        const std::vector<SlotDescriptor*>& tuple_slot_descs,
                        std::vector<ExprContext*>& conjunct_ctxs, const 
std::string& timezone);
 
-    Status read_next_batch(Block* block, bool* eof);
+    Status get_next_block(Block* block, bool* eof) override;
 
     // std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp 
b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
index b204cd4174..9355e36a4c 100644
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
+++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
@@ -215,13 +215,13 @@ Status NewFileArrowScanner::_open_next_reader() {
         int32_t num_of_columns_from_file = _file_slot_descs.size();
 
         _cur_file_reader =
-                _new_arrow_reader(file_reader.release(), _state->batch_size(),
-                                  num_of_columns_from_file, 
range.start_offset, range.size);
+                _new_arrow_reader(_file_slot_descs, file_reader.release(), 
num_of_columns_from_file,
+                                  range.start_offset, range.size);
 
         auto tuple_desc = 
_state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id());
         // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty.
-        Status status = _cur_file_reader->init_reader(tuple_desc, 
_file_slot_descs, _conjunct_ctxs,
-                                                      _state->timezone());
+        Status status =
+                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, 
_state->timezone());
         if (status.is_end_of_file()) {
             continue;
         } else {
@@ -246,12 +246,10 @@ 
NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNod
     //        _init_profiles(profile);
 }
 
-ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* 
file_reader,
-                                                          int64_t batch_size,
-                                                          int32_t 
num_of_columns_from_file,
-                                                          int64_t 
range_start_offset,
-                                                          int64_t range_size) {
-    return new ParquetReaderWrap(file_reader, batch_size, 
num_of_columns_from_file,
+ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
+    return new ParquetReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
                                  range_start_offset, range_size, false);
 }
 
@@ -262,11 +260,10 @@ NewFileORCScanner::NewFileORCScanner(RuntimeState* state, 
NewFileScanNode* paren
         : NewFileArrowScanner(state, parent, limit, scan_range, tracker, 
profile,
                               pre_filter_texprs) {}
 
-ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
-                                                      int32_t 
num_of_columns_from_file,
-                                                      int64_t 
range_start_offset,
-                                                      int64_t range_size) {
-    return new ORCReaderWrap(file_reader, batch_size, 
num_of_columns_from_file, range_start_offset,
-                             range_size, false);
+ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
+    return new ORCReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
+                             range_start_offset, range_size, false);
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h 
b/be/src/vec/exec/scan/new_file_arrow_scanner.h
index d1447fbf0c..89e76f6623 100644
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.h
+++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h
@@ -34,7 +34,8 @@ public:
 
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
-    virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
+    virtual ArrowReaderWrap* _new_arrow_reader(const 
std::vector<SlotDescriptor*>& file_slot_descs,
+                                               FileReader* file_reader,
                                                int32_t 
num_of_columns_from_file,
                                                int64_t range_start_offset, 
int64_t range_size) = 0;
     // Convert input block to output block, if needed.
@@ -64,9 +65,9 @@ public:
     ~NewFileParquetScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
 
     void _init_profiles(RuntimeProfile* profile) override {};
 };
@@ -80,9 +81,9 @@ public:
     ~NewFileORCScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
     void _init_profiles(RuntimeProfile* profile) override {};
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/varrow_scanner.cpp 
b/be/src/vec/exec/varrow_scanner.cpp
index 1e5597f9a0..eb120470a5 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -77,11 +77,11 @@ Status VArrowScanner::_open_next_reader() {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
         _cur_file_reader =
-                _new_arrow_reader(file_reader.release(), _state->batch_size(),
-                                  num_of_columns_from_file, 
range.start_offset, range.size);
+                _new_arrow_reader(_src_slot_descs, file_reader.release(), 
num_of_columns_from_file,
+                                  range.start_offset, range.size);
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-        Status status = _cur_file_reader->init_reader(tuple_desc, 
_src_slot_descs, _conjunct_ctxs,
-                                                      _state->timezone());
+        Status status =
+                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, 
_state->timezone());
 
         if (status.is_end_of_file()) {
             continue;
@@ -287,4 +287,4 @@ void VArrowScanner::close() {
     }
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 7eff7ab329..e67300332d 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -63,7 +63,8 @@ public:
     virtual void close() override;
 
 protected:
-    virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
+    virtual ArrowReaderWrap* _new_arrow_reader(const 
std::vector<SlotDescriptor*>& file_slot_descs,
+                                               FileReader* file_reader,
                                                int32_t 
num_of_columns_from_file,
                                                int64_t range_start_offset, 
int64_t range_size) = 0;
 
diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp
index 81db28f2e6..4c7c80d731 100644
--- a/be/src/vec/exec/vorc_scanner.cpp
+++ b/be/src/vec/exec/vorc_scanner.cpp
@@ -29,11 +29,12 @@ VORCScanner::VORCScanner(RuntimeState* state, 
RuntimeProfile* profile,
         : VArrowScanner(state, profile, params, ranges, broker_addresses, 
pre_filter_texprs,
                         counter) {}
 
-ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
+ArrowReaderWrap* VORCScanner::_new_arrow_reader(const 
std::vector<SlotDescriptor*>& file_slot_descs,
+                                                FileReader* file_reader,
                                                 int32_t 
num_of_columns_from_file,
                                                 int64_t range_start_offset, 
int64_t range_size) {
-    return new ORCReaderWrap(file_reader, batch_size, 
num_of_columns_from_file, range_start_offset,
-                             range_size);
+    return new ORCReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
+                             range_start_offset, range_size);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h
index b7bd1fdf67..78002e8a43 100644
--- a/be/src/vec/exec/vorc_scanner.h
+++ b/be/src/vec/exec/vorc_scanner.h
@@ -46,9 +46,9 @@ public:
     ~VORCScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.cpp 
b/be/src/vec/exec/vparquet_scanner.cpp
index f4d74a6207..4176ba5899 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -30,11 +30,10 @@ VParquetScanner::VParquetScanner(RuntimeState* state, 
RuntimeProfile* profile,
         : VArrowScanner(state, profile, params, ranges, broker_addresses, 
pre_filter_texprs,
                         counter) {}
 
-ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, 
int64_t batch_size,
-                                                    int32_t 
num_of_columns_from_file,
-                                                    int64_t range_start_offset,
-                                                    int64_t range_size) {
-    return new ParquetReaderWrap(file_reader, batch_size, 
num_of_columns_from_file,
+ArrowReaderWrap* VParquetScanner::_new_arrow_reader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
+    return new ParquetReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
                                  range_start_offset, range_size);
 }
 
diff --git a/be/src/vec/exec/vparquet_scanner.h 
b/be/src/vec/exec/vparquet_scanner.h
index d8cf597dbe..7af00a1c38 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -47,9 +47,9 @@ public:
     ~VParquetScanner() override = default;
 
 protected:
-    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t 
batch_size,
-                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
-                                       int64_t range_size) override;
+    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                       int64_t range_start_offset, int64_t 
range_size) override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 895fbbd4ac..f763a6bd5a 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -112,7 +112,7 @@ TEST_F(ParquetReaderTest, normal) {
                 ColumnWithTypeAndName(std::move(data_column), data_type, 
slot_desc->col_name()));
     }
     bool eof = false;
-    p_reader->read_next_batch(block, &eof);
+    p_reader->get_next_block(block, &eof);
     for (auto& col : block->get_columns_with_type_and_name()) {
         ASSERT_EQ(col.column->size(), 10);
     }
@@ -236,4 +236,4 @@ TEST_F(ParquetReaderTest, scanner) {
 }
 
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to