This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9b9ed1aef1 [data lake](arrow scanner)Fix file arrow scanner column index out of range core. (#11691) 9b9ed1aef1 is described below commit 9b9ed1aef11a651a8b5acf96ba632073feae14a0 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri Aug 12 11:34:29 2022 +0800 [data lake](arrow scanner)Fix file arrow scanner column index out of range core. (#11691) --- be/src/exec/arrow/arrow_reader.cpp | 6 +++--- be/src/exec/arrow/arrow_reader.h | 3 ++- be/src/exec/arrow/orc_reader.cpp | 6 +++--- be/src/exec/arrow/orc_reader.h | 2 +- be/src/exec/arrow/parquet_reader.cpp | 6 +++--- be/src/exec/arrow/parquet_reader.h | 2 +- be/src/runtime/descriptors.cpp | 3 +++ be/src/runtime/descriptors.h | 2 ++ be/src/vec/exec/file_arrow_scanner.cpp | 9 ++++----- 9 files changed, 22 insertions(+), 17 deletions(-) diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index a2e5f7c33e..83f3e191f6 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -38,10 +38,10 @@ namespace doris { // Broker ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, bool caseSensitive) + int32_t num_of_columns_from_file, bool case_sensitive) : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file), - _caseSensitive(caseSensitive) { + _case_sensitive(case_sensitive) { _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader)); _rb_reader = nullptr; _total_groups = 0; @@ -85,7 +85,7 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple } int ArrowReaderWrap::get_cloumn_index(std::string column_name) { - std::string real_column_name = _caseSensitive ? column_name : to_lower(column_name); + std::string real_column_name = _case_sensitive ? column_name : to_lower(column_name); auto iter = _map_column.find(real_column_name); if (iter != _map_column.end()) { return iter->second; diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 159377c480..24c381316e 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -100,6 +100,7 @@ public: int get_cloumn_index(std::string column_name); void prefetch_batch(); + bool is_case_sensitive() { return _case_sensitive; } protected: virtual Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs); @@ -126,7 +127,7 @@ protected: std::list<std::shared_ptr<arrow::RecordBatch>> _queue; const size_t _max_queue_size = config::parquet_reader_max_buffer_size; std::thread _thread; - bool _caseSensitive; + bool _case_sensitive; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 0156355b39..3223c64b6a 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -30,8 +30,8 @@ namespace doris { ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size, bool caseSensitive) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive), + int64_t range_size, bool case_sensitive) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, case_sensitive), _range_start_offset(range_start_offset), _range_size(range_size) { _reader = nullptr; @@ -68,7 +68,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie(); for (size_t i = 0; i < schema->num_fields(); ++i) { std::string schemaName = - _caseSensitive ? schema->field(i)->name() : to_lower(schema->field(i)->name()); + _case_sensitive ? schema->field(i)->name() : to_lower(schema->field(i)->name()); // orc index started from 1. _map_column.emplace(schemaName, i + 1); diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index 392addfea9..c166196d83 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -33,7 +33,7 @@ namespace doris { class ORCReaderWrap final : public ArrowReaderWrap { public: ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size, bool caseSensitive = true); + int64_t range_start_offset, int64_t range_size, bool case_sensitive = true); ~ORCReaderWrap() override = default; Status init_reader(const TupleDescriptor* tuple_desc, diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index 03f6657586..97b033b4f7 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -39,8 +39,8 @@ namespace doris { // Broker ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size, bool caseSensitive) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive), + int64_t range_size, bool case_sensitive) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, case_sensitive), _rows_of_group(0), _current_line_of_group(0), _current_line_of_batch(0), @@ -92,7 +92,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, } else { schemaName = schemaDescriptor->Column(i)->name(); } - _map_column.emplace(_caseSensitive ? schemaName : to_lower(schemaName), i); + _map_column.emplace(_case_sensitive ? schemaName : to_lower(schemaName), i); } _timezone = timezone; diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index d4805f8d84..d5d0165665 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap { public: // batch_size is not use here ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size, bool caseSensitive = true); + int64_t range_start_offset, int64_t range_size, bool case_sensitive = true); ~ParquetReaderWrap() override = default; // Read diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index e035068ef2..62e5707003 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -27,6 +27,7 @@ #include "common/object_pool.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/descriptors.pb.h" +#include "util/string_util.h" #include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_nullable.h" @@ -55,6 +56,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc) _tuple_offset(tdesc.byteOffset), _null_indicator_offset(tdesc.nullIndicatorByte, tdesc.nullIndicatorBit), _col_name(tdesc.colName), + _col_name_lower_case(to_lower(tdesc.colName)), _col_unique_id(tdesc.col_unique_id), _slot_idx(tdesc.slotIdx), _slot_size(_type.get_slot_size()), @@ -69,6 +71,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) _tuple_offset(pdesc.byte_offset()), _null_indicator_offset(pdesc.null_indicator_byte(), pdesc.null_indicator_bit()), _col_name(pdesc.col_name()), + _col_name_lower_case(to_lower(pdesc.col_name())), _col_unique_id(-1), _slot_idx(pdesc.slot_idx()), _slot_size(_type.get_slot_size()), diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 74f59655af..c40285ce9f 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -100,6 +100,7 @@ public: int slot_size() const { return _slot_size; } const std::string& col_name() const { return _col_name; } + const std::string& col_name_lower_case() const { return _col_name_lower_case; } /// Return true if the physical layout of this descriptor matches the physical layout /// of other_desc, but not necessarily ids. @@ -128,6 +129,7 @@ private: const int _tuple_offset; const NullIndicatorOffset _null_indicator_offset; const std::string _col_name; + const std::string _col_name_lower_case; const int32_t _col_unique_id; diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index d416fb735c..c8ce603f89 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -186,11 +186,10 @@ Status FileArrowScanner::_append_batch_to_block(Block* block) { if (slot_desc == nullptr) { continue; } - int file_index = _cur_file_reader->get_cloumn_index(slot_desc->col_name()); - if (file_index == -1) { - continue; - } - auto* array = _batch->column(file_index).get(); + std::string real_column_name = _cur_file_reader->is_case_sensitive() + ? slot_desc->col_name() + : slot_desc->col_name_lower_case(); + auto* array = _batch->GetColumnByName(real_column_name).get(); auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); RETURN_IF_ERROR(arrow_column_to_doris_column( array, _arrow_batch_cur_idx, column_with_type_and_name.column, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org