This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f47edb9318fc12cab9662439396e663fe661541b
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Feb 21 14:14:32 2023 +0800

    [fix](tvf) fix bug that failed to get schema of tvf when file is empty 
(#16928)
    
    In previous implementation, when querying tvf, FE will get schema from BE.
    And BE will try to open the first file to get its schema info, but for orc 
or parquet format,
    if the file is empty, it will return error.
    But even for an empty file, we can still get schema info from file's footer.
    So we should handle the empty file to get schema info correctly.
    
    Also modify the catalog doc to add some FAQ.
---
 be/src/service/internal_service.cpp                |  6 +++---
 be/src/vec/exec/format/csv/csv_reader.cpp          |  4 ++--
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 11 ++++-------
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  8 ++------
 be/src/vec/exec/scan/vfile_scanner.cpp             |  4 ++--
 5 files changed, 13 insertions(+), 20 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 5922fe6419..aa3510452d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -384,7 +384,8 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
     // this will influence query execution, because the pthreads under bthread 
may be
     // exhausted, so we put this to a local thread pool to process
     int64_t submit_task_time_ns = MonotonicNanos();
-    bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns, this]() {
+    bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns,
+                                       this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
@@ -555,8 +556,7 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
         case TFileFormatType::FORMAT_CSV_DEFLATE: {
             // file_slots is no use
             std::vector<SlotDescriptor*> file_slots;
-            reader.reset(
-                    new vectorized::CsvReader(profile.get(), params, range, 
file_slots));
+            reader.reset(new vectorized::CsvReader(profile.get(), params, 
range, file_slots));
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index c7099b24c7..ce537d93cf 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -115,7 +115,7 @@ Status CsvReader::init_reader(bool is_load) {
     RETURN_IF_ERROR(real_reader->open());
     if (real_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
-        return Status::EndOfFile("Empty File");
+        return Status::EndOfFile("init reader failed, empty csv file: " + 
_range.path);
     }
 
     // get column_separator and line_delimiter
@@ -544,7 +544,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
                                                     _range.file_size, 0, 
_file_reader));
     RETURN_IF_ERROR(_file_reader->open());
     if (_file_reader->size() == 0) {
-        return Status::EndOfFile("Empty File");
+        return Status::EndOfFile("get parsed schema failed, empty csv file: " 
+ _range.path);
     }
 
     // get column_separator and line_delimiter
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 57e0b4fc89..0840155203 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -142,7 +142,7 @@ Status OrcReader::init_reader(
         _file_reader = new ORCFileInputStream(_scan_range.path, 
inner_reader.release());
     }
     if (_file_reader->getLength() == 0) {
-        return Status::EndOfFile("Empty orc file");
+        return Status::EndOfFile("init reader failed, empty orc file: " + 
_scan_range.path);
     }
 
     // create orc reader
@@ -153,7 +153,8 @@ Status OrcReader::init_reader(
         return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
     }
     if (_reader->getNumberOfRows() == 0) {
-        return Status::EndOfFile("Empty orc file");
+        return Status::EndOfFile("init reader failed, empty orc file with row 
num 0: " +
+                                 _scan_range.path);
     }
     // _init_bloom_filter(colname_to_value_range);
 
@@ -197,7 +198,7 @@ Status 
OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
         _file_reader = new ORCFileInputStream(_scan_range.path, 
inner_reader.release());
     }
     if (_file_reader->getLength() == 0) {
-        return Status::EndOfFile("Empty orc file");
+        return Status::EndOfFile("get parsed schema fail, empty orc file: " + 
_scan_range.path);
     }
 
     // create orc reader
@@ -208,10 +209,6 @@ Status 
OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
         return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
     }
 
-    if (_reader->getNumberOfRows() == 0) {
-        return Status::EndOfFile("Empty orc file");
-    }
-
     auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 35302f9306..d873260010 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -147,7 +147,7 @@ Status ParquetReader::_open_file() {
     if (_file_metadata == nullptr) {
         RETURN_IF_ERROR(_file_reader->open());
         if (_file_reader->size() == 0) {
-            return Status::EndOfFile("Empty Parquet File");
+            return Status::EndOfFile("open file failed, empty parquet file: " 
+ _scan_range.path);
         }
         RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), 
_file_metadata));
     }
@@ -179,7 +179,7 @@ Status ParquetReader::init_reader(
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
     _total_groups = _t_metadata->row_groups.size();
     if (_total_groups == 0) {
-        return Status::EndOfFile("Empty Parquet File");
+        return Status::EndOfFile("init reader failed, empty parquet file: " + 
_scan_range.path);
     }
     // all_column_names are all the columns required by user sql.
     // missing_column_names are the columns required by user sql but not in 
the parquet file,
@@ -350,10 +350,6 @@ Status 
ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
     _t_metadata = &_file_metadata->to_thrift();
 
     _total_groups = _t_metadata->row_groups.size();
-    if (_total_groups == 0) {
-        return Status::EndOfFile("Empty Parquet File");
-    }
-
     auto schema_desc = _file_metadata->schema();
     for (int i = 0; i < schema_desc.size(); ++i) {
         // Get the Column Reader for the boolean column
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 8619719194..58eb496d71 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -494,8 +494,8 @@ Status VFileScanner::_get_next_reader() {
                 IcebergTableReader* iceberg_reader =
                         new IcebergTableReader((GenericReader*)parquet_reader, 
_profile, _state,
                                                _params, range, _kv_cache);
-                iceberg_reader->init_reader(_file_col_names, _col_id_name_map,
-                                            _colname_to_value_range, 
_push_down_expr);
+                init_status = iceberg_reader->init_reader(_file_col_names, 
_col_id_name_map,
+                                                          
_colname_to_value_range, _push_down_expr);
                 RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to