This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9b9ed1aef1 [data lake](arrow scanner)Fix file arrow scanner column
index out of range core. (#11691)
9b9ed1aef1 is described below
commit 9b9ed1aef11a651a8b5acf96ba632073feae14a0
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Aug 12 11:34:29 2022 +0800
[data lake](arrow scanner)Fix file arrow scanner column index out of range
core. (#11691)
---
be/src/exec/arrow/arrow_reader.cpp | 6 +++---
be/src/exec/arrow/arrow_reader.h | 3 ++-
be/src/exec/arrow/orc_reader.cpp | 6 +++---
be/src/exec/arrow/orc_reader.h | 2 +-
be/src/exec/arrow/parquet_reader.cpp | 6 +++---
be/src/exec/arrow/parquet_reader.h | 2 +-
be/src/runtime/descriptors.cpp | 3 +++
be/src/runtime/descriptors.h | 2 ++
be/src/vec/exec/file_arrow_scanner.cpp | 9 ++++-----
9 files changed, 22 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/arrow/arrow_reader.cpp
b/be/src/exec/arrow/arrow_reader.cpp
index a2e5f7c33e..83f3e191f6 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -38,10 +38,10 @@ namespace doris {
// Broker
ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file, bool
caseSensitive)
+ int32_t num_of_columns_from_file, bool
case_sensitive)
: _batch_size(batch_size),
_num_of_columns_from_file(num_of_columns_from_file),
- _caseSensitive(caseSensitive) {
+ _case_sensitive(case_sensitive) {
_arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
_rb_reader = nullptr;
_total_groups = 0;
@@ -85,7 +85,7 @@ Status ArrowReaderWrap::column_indices(const
std::vector<SlotDescriptor*>& tuple
}
int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
- std::string real_column_name = _caseSensitive ? column_name :
to_lower(column_name);
+ std::string real_column_name = _case_sensitive ? column_name :
to_lower(column_name);
auto iter = _map_column.find(real_column_name);
if (iter != _map_column.end()) {
return iter->second;
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 159377c480..24c381316e 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -100,6 +100,7 @@ public:
int get_cloumn_index(std::string column_name);
void prefetch_batch();
+ bool is_case_sensitive() { return _case_sensitive; }
protected:
virtual Status column_indices(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
@@ -126,7 +127,7 @@ protected:
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
- bool _caseSensitive;
+ bool _case_sensitive;
};
} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 0156355b39..3223c64b6a 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -30,8 +30,8 @@ namespace doris {
ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t
range_start_offset,
- int64_t range_size, bool caseSensitive)
- : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
caseSensitive),
+ int64_t range_size, bool case_sensitive)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
case_sensitive),
_range_start_offset(range_start_offset),
_range_size(range_size) {
_reader = nullptr;
@@ -68,7 +68,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
for (size_t i = 0; i < schema->num_fields(); ++i) {
std::string schemaName =
- _caseSensitive ? schema->field(i)->name() :
to_lower(schema->field(i)->name());
+ _case_sensitive ? schema->field(i)->name() :
to_lower(schema->field(i)->name());
// orc index started from 1.
_map_column.emplace(schemaName, i + 1);
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index 392addfea9..c166196d83 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -33,7 +33,7 @@ namespace doris {
class ORCReaderWrap final : public ArrowReaderWrap {
public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size, bool
caseSensitive = true);
+ int64_t range_start_offset, int64_t range_size, bool
case_sensitive = true);
~ORCReaderWrap() override = default;
Status init_reader(const TupleDescriptor* tuple_desc,
diff --git a/be/src/exec/arrow/parquet_reader.cpp
b/be/src/exec/arrow/parquet_reader.cpp
index 03f6657586..97b033b4f7 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -39,8 +39,8 @@ namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t
batch_size,
int32_t num_of_columns_from_file, int64_t
range_start_offset,
- int64_t range_size, bool caseSensitive)
- : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
caseSensitive),
+ int64_t range_size, bool case_sensitive)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
case_sensitive),
_rows_of_group(0),
_current_line_of_group(0),
_current_line_of_batch(0),
@@ -92,7 +92,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
} else {
schemaName = schemaDescriptor->Column(i)->name();
}
- _map_column.emplace(_caseSensitive ? schemaName :
to_lower(schemaName), i);
+ _map_column.emplace(_case_sensitive ? schemaName :
to_lower(schemaName), i);
}
_timezone = timezone;
diff --git a/be/src/exec/arrow/parquet_reader.h
b/be/src/exec/arrow/parquet_reader.h
index d4805f8d84..d5d0165665 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size, bool
caseSensitive = true);
+ int64_t range_start_offset, int64_t range_size, bool
case_sensitive = true);
~ParquetReaderWrap() override = default;
// Read
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index e035068ef2..62e5707003 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -27,6 +27,7 @@
#include "common/object_pool.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/descriptors.pb.h"
+#include "util/string_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
@@ -55,6 +56,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_tuple_offset(tdesc.byteOffset),
_null_indicator_offset(tdesc.nullIndicatorByte,
tdesc.nullIndicatorBit),
_col_name(tdesc.colName),
+ _col_name_lower_case(to_lower(tdesc.colName)),
_col_unique_id(tdesc.col_unique_id),
_slot_idx(tdesc.slotIdx),
_slot_size(_type.get_slot_size()),
@@ -69,6 +71,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_tuple_offset(pdesc.byte_offset()),
_null_indicator_offset(pdesc.null_indicator_byte(),
pdesc.null_indicator_bit()),
_col_name(pdesc.col_name()),
+ _col_name_lower_case(to_lower(pdesc.col_name())),
_col_unique_id(-1),
_slot_idx(pdesc.slot_idx()),
_slot_size(_type.get_slot_size()),
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 74f59655af..c40285ce9f 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -100,6 +100,7 @@ public:
int slot_size() const { return _slot_size; }
const std::string& col_name() const { return _col_name; }
+ const std::string& col_name_lower_case() const { return
_col_name_lower_case; }
/// Return true if the physical layout of this descriptor matches the
physical layout
/// of other_desc, but not necessarily ids.
@@ -128,6 +129,7 @@ private:
const int _tuple_offset;
const NullIndicatorOffset _null_indicator_offset;
const std::string _col_name;
+ const std::string _col_name_lower_case;
const int32_t _col_unique_id;
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp
b/be/src/vec/exec/file_arrow_scanner.cpp
index d416fb735c..c8ce603f89 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -186,11 +186,10 @@ Status FileArrowScanner::_append_batch_to_block(Block*
block) {
if (slot_desc == nullptr) {
continue;
}
- int file_index =
_cur_file_reader->get_cloumn_index(slot_desc->col_name());
- if (file_index == -1) {
- continue;
- }
- auto* array = _batch->column(file_index).get();
+ std::string real_column_name = _cur_file_reader->is_case_sensitive()
+ ? slot_desc->col_name()
+ :
slot_desc->col_name_lower_case();
+ auto* array = _batch->GetColumnByName(real_column_name).get();
auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(
array, _arrow_batch_cur_idx, column_with_type_and_name.column,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]