This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ac5228c05 [feature-wip][multi-catalog]Support prefetch for orc file
format (#11292)
0ac5228c05 is described below
commit 0ac5228c0535fa42efed5ec4e1a1850c5e753da0
Author: huangzhaowei <[email protected]>
AuthorDate: Tue Aug 2 11:01:15 2022 +0800
[feature-wip][multi-catalog]Support prefetch for orc file format (#11292)
Refactor the prefetch code in parquet and support prefetch for orc file
format
---
be/src/exec/arrow/arrow_reader.cpp | 61 +++++++++++++++++++++++
be/src/exec/arrow/arrow_reader.h | 16 +++++-
be/src/exec/arrow/orc_reader.cpp | 40 +++++++--------
be/src/exec/arrow/orc_reader.h | 3 +-
be/src/exec/arrow/parquet_reader.cpp | 94 ++++++++----------------------------
be/src/exec/arrow/parquet_reader.h | 15 ++----
6 files changed, 120 insertions(+), 109 deletions(-)
diff --git a/be/src/exec/arrow/arrow_reader.cpp
b/be/src/exec/arrow/arrow_reader.cpp
index 9d20697148..5d1785f744 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -48,6 +48,11 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader,
int64_t batch_size,
ArrowReaderWrap::~ArrowReaderWrap() {
close();
+ _closed = true;
+ _queue_writer_cond.notify_one();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
}
void ArrowReaderWrap::close() {
@@ -76,6 +81,62 @@ Status ArrowReaderWrap::column_indices(const
std::vector<SlotDescriptor*>& tuple
return Status::OK();
}
+Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
bool* eof) {
+ std::unique_lock<std::mutex> lock(_mtx);
+ while (!_closed && _queue.empty()) {
+ if (_batch_eof) {
+ _include_column_ids.clear();
+ *eof = true;
+ _batch_eof = false;
+ return Status::OK();
+ }
+ _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
+ }
+ if (UNLIKELY(_closed)) {
+ return Status::InternalError(_status.message());
+ }
+ *batch = _queue.front();
+ _queue.pop_front();
+ _queue_writer_cond.notify_one();
+ return Status::OK();
+}
+
+void ArrowReaderWrap::prefetch_batch() {
+ auto insert_batch = [this](const auto& batch) {
+ std::unique_lock<std::mutex> lock(_mtx);
+ while (!_closed && _queue.size() == _max_queue_size) {
+ _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
+ }
+ if (UNLIKELY(_closed)) {
+ return;
+ }
+ _queue.push_back(batch);
+ _queue_reader_cond.notify_one();
+ };
+ int current_group = _current_group;
+ int total_groups = _total_groups;
+ while (true) {
+ if (_closed || current_group >= total_groups) {
+ _batch_eof = true;
+ _queue_reader_cond.notify_one();
+ return;
+ }
+ if (filter_row_group(current_group)) {
+ current_group++;
+ continue;
+ }
+
+ arrow::RecordBatchVector batches;
+ read_batches(batches, current_group);
+ if (!_status.ok()) {
+ _closed = true;
+ return;
+ }
+ std::for_each(batches.begin(), batches.end(), insert_batch);
+ current_group++;
+ }
+}
+
ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
ArrowFile::~ArrowFile() {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 1389426fc9..704ca0750e 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -92,13 +92,17 @@ public:
return Status::NotSupported("Not Implemented read");
}
// for vec
- virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
bool* eof) = 0;
+ Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
virtual Status size(int64_t* size) { return Status::NotSupported("Not
Implemented size"); }
+ void prefetch_batch();
+
protected:
virtual Status column_indices(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
+ virtual void read_batches(arrow::RecordBatchVector& batches, int
current_group) = 0;
+ virtual bool filter_row_group(int current_group) = 0;
protected:
const int64_t _batch_size;
@@ -110,6 +114,16 @@ protected:
std::map<std::string, int> _map_column; // column-name <---> column-index
std::vector<int> _include_column_ids; // columns that need to get from
file
std::shared_ptr<Statistics> _statistics;
+
+ std::atomic<bool> _closed = false;
+ std::atomic<bool> _batch_eof = false;
+ arrow::Status _status;
+ std::mutex _mtx;
+ std::condition_variable _queue_reader_cond;
+ std::condition_variable _queue_writer_cond;
+ std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+ const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
+ std::thread _thread;
};
} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 78d51d8376..0db5640369 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -71,11 +71,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
}
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
- bool eof = false;
- RETURN_IF_ERROR(_next_stripe_reader(&eof));
- if (eof) {
- return Status::EndOfFile("end of file");
- }
+ _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
return Status::OK();
}
@@ -143,23 +139,23 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
return Status::OK();
}
-Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
bool* eof) {
- *eof = false;
- do {
- auto st = _rb_reader->ReadNext(batch);
- if (!st.ok()) {
- LOG(WARNING) << "failed to get next batch, errmsg=" << st;
- return Status::InternalError(st.ToString());
- }
- if (*batch == nullptr) {
- // try next stripe
- RETURN_IF_ERROR(_next_stripe_reader(eof));
- if (*eof) {
- break;
- }
- }
- } while (*batch == nullptr);
- return Status::OK();
+void ORCReaderWrap::read_batches(arrow::RecordBatchVector& batches, int
current_group) {
+ bool eof = false;
+ Status status = _next_stripe_reader(&eof);
+ if (!status.ok()) {
+ _closed = true;
+ return;
+ }
+ if (eof) {
+ _closed = true;
+ return;
+ }
+
+ _status = _rb_reader->ReadAll(&batches);
+}
+
+bool ORCReaderWrap::filter_row_group(int current_group) {
+ return false;
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index fe8c5c54a4..1e6f0f83e6 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -40,11 +40,12 @@ public:
const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
- Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof)
override;
private:
Status _next_stripe_reader(bool* eof);
Status _seek_start_stripe();
+ void read_batches(arrow::RecordBatchVector& batches, int current_group)
override;
+ bool filter_row_group(int current_group) override;
private:
// orc file reader object
diff --git a/be/src/exec/arrow/parquet_reader.cpp
b/be/src/exec/arrow/parquet_reader.cpp
index c0f45f52af..8d119146b4 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -46,14 +46,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader*
file_reader, int64_t batch_size
_range_start_offset(range_start_offset),
_range_size(range_size) {}
-ParquetReaderWrap::~ParquetReaderWrap() {
- _closed = true;
- _queue_writer_cond.notify_one();
- if (_thread.joinable()) {
- _thread.join();
- }
-}
-
Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>&
tuple_slot_descs,
const std::vector<ExprContext*>&
conjunct_ctxs,
@@ -111,7 +103,7 @@ Status ParquetReaderWrap::init_reader(const
TupleDescriptor* tuple_desc,
_row_group_reader->init_filter_groups(tuple_desc, _map_column,
_include_column_ids,
file_size);
}
- _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
+ _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
return Status::OK();
} catch (parquet::ParquetException& e) {
std::stringstream str_error;
@@ -184,26 +176,6 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) {
return Status::OK();
}
-Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>*
batch, bool* eof) {
- std::unique_lock<std::mutex> lock(_mtx);
- while (!_closed && _queue.empty()) {
- if (_batch_eof) {
- _include_column_ids.clear();
- *eof = true;
- _batch_eof = false;
- return Status::OK();
- }
- _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
- }
- if (UNLIKELY(_closed)) {
- return Status::InternalError(_status.message());
- }
- *batch = _queue.front();
- _queue.pop_front();
- _queue_writer_cond.notify_one();
- return Status::OK();
-}
-
Status ParquetReaderWrap::handle_timestamp(const
std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type =
std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
@@ -546,50 +518,6 @@ Status ParquetReaderWrap::read(Tuple* tuple, const
std::vector<SlotDescriptor*>&
return read_record_batch(eof);
}
-void ParquetReaderWrap::prefetch_batch() {
- auto insert_batch = [this](const auto& batch) {
- std::unique_lock<std::mutex> lock(_mtx);
- while (!_closed && _queue.size() == _max_queue_size) {
- _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
- }
- if (UNLIKELY(_closed)) {
- return;
- }
- _queue.push_back(batch);
- _queue_reader_cond.notify_one();
- };
- int current_group = 0;
- int total_groups = _total_groups;
- while (true) {
- if (_closed || current_group >= total_groups) {
- _batch_eof = true;
- _queue_reader_cond.notify_one();
- return;
- }
- if (config::parquet_predicate_push_down) {
- auto filter_group_set = _row_group_reader->filter_groups();
- if (filter_group_set.end() !=
filter_group_set.find(current_group)) {
- // find filter group, skip
- current_group++;
- continue;
- }
- }
- _status = _reader->GetRecordBatchReader({current_group},
_include_column_ids, &_rb_reader);
- if (!_status.ok()) {
- _closed = true;
- return;
- }
- arrow::RecordBatchVector batches;
- _status = _rb_reader->ReadAll(&batches);
- if (!_status.ok()) {
- _closed = true;
- return;
- }
- std::for_each(batches.begin(), batches.end(), insert_batch);
- current_group++;
- }
-}
-
Status ParquetReaderWrap::read_next_batch() {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
@@ -609,4 +537,24 @@ Status ParquetReaderWrap::read_next_batch() {
return Status::OK();
}
+void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int
current_group) {
+ _status = _reader->GetRecordBatchReader({current_group},
_include_column_ids, &_rb_reader);
+ if (!_status.ok()) {
+ _closed = true;
+ return;
+ }
+ _status = _rb_reader->ReadAll(&batches);
+}
+
+bool ParquetReaderWrap::filter_row_group(int current_group) {
+ if (config::parquet_predicate_push_down) {
+ auto filter_group_set = _row_group_reader->filter_groups();
+ if (filter_group_set.end() != filter_group_set.find(current_group)) {
+ // find filter group, skip
+ return true;
+ }
+ }
+ return false;
+}
+
} // namespace doris
diff --git a/be/src/exec/arrow/parquet_reader.h
b/be/src/exec/arrow/parquet_reader.h
index 3bf4cf4814..95774f60b0 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -64,7 +64,7 @@ public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size);
- ~ParquetReaderWrap() override;
+ ~ParquetReaderWrap() override = default;
// Read
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>&
tuple_slot_descs,
@@ -75,7 +75,6 @@ public:
const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
Status init_parquet_type();
- Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof)
override;
private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool,
const uint8_t* value,
@@ -86,8 +85,9 @@ private:
int32_t* wbtyes);
private:
- void prefetch_batch();
Status read_next_batch();
+ void read_batches(arrow::RecordBatchVector& batches, int current_group)
override;
+ bool filter_row_group(int current_group) override;
private:
// parquet file reader object
@@ -104,16 +104,7 @@ private:
int64_t _range_size;
private:
- std::atomic<bool> _closed = false;
- std::atomic<bool> _batch_eof = false;
- arrow::Status _status;
- std::mutex _mtx;
- std::condition_variable _queue_reader_cond;
- std::condition_variable _queue_writer_cond;
- std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
std::unique_ptr<doris::RowGroupReader> _row_group_reader;
- const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
- std::thread _thread;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]