This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f39f57636b [feature-wip](parquet-reader) update column read model and
add page index (#11601)
f39f57636b is described below
commit f39f57636bec019c646b5ed53aa80dff65e59360
Author: slothever <[email protected]>
AuthorDate: Tue Aug 16 15:04:07 2022 +0800
[feature-wip](parquet-reader) update column read model and add page index
(#11601)
---
be/src/exprs/expr_context.h | 6 +-
be/src/vec/exec/file_hdfs_scanner.cpp | 55 +++--
be/src/vec/exec/file_hdfs_scanner.h | 19 +-
be/src/vec/exec/file_scan_node.cpp | 3 +
be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 30 +--
.../vec/exec/format/parquet/parquet_thrift_util.h | 5 -
.../parquet/vparquet_column_chunk_reader.cpp | 29 ++-
.../format/parquet/vparquet_column_chunk_reader.h | 7 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 85 +++++--
.../exec/format/parquet/vparquet_column_reader.h | 64 ++++-
.../exec/format/parquet/vparquet_file_metadata.cpp | 2 +-
.../exec/format/parquet/vparquet_file_metadata.h | 2 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 234 +++---------------
.../exec/format/parquet/vparquet_group_reader.h | 81 +------
.../exec/format/parquet/vparquet_page_index.cpp | 35 +--
.../vec/exec/format/parquet/vparquet_page_index.h | 22 +-
.../exec/format/parquet/vparquet_page_reader.cpp | 2 +-
.../vec/exec/format/parquet/vparquet_page_reader.h | 2 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 263 +++++++++++++++++----
be/src/vec/exec/format/parquet/vparquet_reader.h | 64 +++--
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 2 +-
21 files changed, 548 insertions(+), 464 deletions(-)
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 26d655a2e2..b1df684a9d 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -27,7 +27,7 @@
#include "exprs/expr_value.h"
#include "exprs/slot_ref.h"
#include "udf/udf.h"
-#include "vec/exec/format/parquet/vparquet_group_reader.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
#undef USING_DORIS_UDF
#define USING_DORIS_UDF using namespace doris_udf
@@ -38,7 +38,7 @@ namespace doris {
namespace vectorized {
class VOlapScanNode;
-class RowGroupReader;
+class ParquetReader;
} // namespace vectorized
class Expr;
@@ -166,7 +166,7 @@ private:
friend class OlapScanNode;
friend class EsPredicate;
friend class RowGroupReader;
- friend class vectorized::RowGroupReader;
+ friend class vectorized::ParquetReader;
friend class vectorized::VOlapScanNode;
/// FunctionContexts for each registered expression. The FunctionContexts
are created
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp
b/be/src/vec/exec/file_hdfs_scanner.cpp
index 08c5084b6d..ab6401de8b 100644
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ b/be/src/vec/exec/file_hdfs_scanner.cpp
@@ -21,43 +21,62 @@
namespace doris::vectorized {
+ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state,
RuntimeProfile* profile,
+ const TFileScanRangeParams&
params,
+ const
std::vector<TFileRangeDesc>& ranges,
+ const std::vector<TExpr>&
pre_filter_texprs,
+ ScannerCounter* counter)
+ : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs,
counter) {}
+
Status ParquetFileHdfsScanner::open() {
+ RETURN_IF_ERROR(FileScanner::open());
+ if (_ranges.empty()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(_get_next_reader(_next_range));
return Status();
}
+void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {}
+
Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
- // todo: get block from queue
- auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
+ if (_next_range >= _ranges.size() || _scanner_eof) {
+ *eof = true;
return Status::OK();
}
- const TFileRangeDesc& range = _ranges[_next_range++];
+ RETURN_IF_ERROR(init_block(block));
+ bool range_eof = false;
+ RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
+ if (range_eof) {
+ RETURN_IF_ERROR(_get_next_reader(_next_range++));
+ }
+ return Status::OK();
+}
+
+Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) {
+ const TFileRangeDesc& range = _ranges[_next_range];
+ _current_range_offset = range.start_offset;
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(),
_profile, _params, range,
file_reader));
_reader.reset(new ParquetReader(file_reader.release(),
_file_slot_descs.size(),
- range.start_offset, range.size));
+ _state->query_options().batch_size,
range.start_offset,
+ range.size));
+ auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status =
_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
_state->timezone());
if (!status.ok()) {
- _scanner_eof = true;
- return Status::OK();
- }
- while (_reader->has_next()) {
- Status st = _reader->read_next_batch(block);
- if (st.is_end_of_file()) {
- break;
+ if (status.is_end_of_file()) {
+ _scanner_eof = true;
+ return Status::OK();
}
+ return status;
}
return Status::OK();
}
-void ParquetFileHdfsScanner::close() {}
-
-void ParquetFileHdfsScanner::_prefetch_batch() {
- // 1. call file reader next batch
- // 2. push batch to queue, when get_next is called, pop batch
+void ParquetFileHdfsScanner::close() {
+ FileScanner::close();
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_hdfs_scanner.h
b/be/src/vec/exec/file_hdfs_scanner.h
index 3db196a83c..e24063c89b 100644
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ b/be/src/vec/exec/file_hdfs_scanner.h
@@ -24,21 +24,34 @@
namespace doris::vectorized {
-class HdfsFileScanner : public FileScanner {};
+class HdfsFileScanner : public FileScanner {
+public:
+ HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const
std::vector<TFileRangeDesc>& ranges,
+ const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter)
+ : FileScanner(state, profile, params, ranges, pre_filter_texprs,
counter) {};
+};
class ParquetFileHdfsScanner : public HdfsFileScanner {
public:
+ ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const std::vector<TFileRangeDesc>& ranges,
+ const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter);
Status open() override;
Status get_next(vectorized::Block* block, bool* eof) override;
-
void close() override;
+protected:
+ void _init_profiles(RuntimeProfile* profile) override;
+
private:
- void _prefetch_batch();
+ Status _get_next_reader(int _next_range);
private:
std::shared_ptr<ParquetReader> _reader;
+ int64_t _current_range_offset;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_scan_node.cpp
b/be/src/vec/exec/file_scan_node.cpp
index c653c84a15..ec8a5165ce 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -30,6 +30,7 @@
#include "util/thread.h"
#include "util/types.h"
#include "vec/exec/file_arrow_scanner.h"
+#include "vec/exec/file_hdfs_scanner.h"
#include "vec/exec/file_text_scanner.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
@@ -471,6 +472,8 @@ std::unique_ptr<FileScanner>
FileScanNode::create_scanner(const TFileScanRange&
case TFileFormatType::FORMAT_PARQUET:
scan = new VFileParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
scan_range.ranges, _pre_filter_texprs,
counter);
+ // scan = new ParquetFileHdfsScanner(_runtime_state,
runtime_profile(), scan_range.params,
+ // scan_range.ranges,
_pre_filter_texprs, counter);
break;
case TFileFormatType::FORMAT_ORC:
scan = new VFileORCScanner(_runtime_state, runtime_profile(),
scan_range.params,
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index b07f9afbe9..b58701418e 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -79,8 +79,8 @@ namespace doris::vectorized {
return true; \
}
-bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type,
std::vector<void*> in_pred_values,
- const char* min_bytes, const char*
max_bytes) {
+bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*>
in_pred_values,
+ const char* min_bytes, const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
@@ -125,8 +125,8 @@ bool RowGroupReader::_eval_in_val(PrimitiveType
conjunct_type, std::vector<void*
return false;
}
-void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char*
min_bytes,
- const char* max_bytes, bool&
need_filter) {
+void ParquetReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
+ const char* max_bytes, bool&
need_filter) {
Expr* conjunct = ctx->root();
std::vector<void*> in_pred_values;
const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
@@ -150,8 +150,8 @@ void RowGroupReader::_eval_in_predicate(ExprContext* ctx,
const char* min_bytes,
}
}
-bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const
char* min_bytes,
- const char* max_bytes) {
+bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+ const char* max_bytes) {
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value,
min, max)
@@ -200,7 +200,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type,
void* value, const ch
return false;
}
-bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const
char* max_bytes) {
+bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes)
{
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@@ -250,7 +250,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type,
void* value, const ch
return false;
}
-bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const
char* max_bytes) {
+bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes)
{
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@@ -300,7 +300,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type,
void* value, const ch
return false;
}
-bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const
char* min_bytes) {
+bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes)
{
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@@ -350,7 +350,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type,
void* value, const ch
return false;
}
-bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const
char* min_bytes) {
+bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes)
{
switch (conjunct_type) {
case TYPE_TINYINT: {
_PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@@ -400,8 +400,8 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type,
void* value, const ch
return false;
}
-void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char*
min_bytes,
- const char* max_bytes, bool&
need_filter) {
+void ParquetReader::_eval_binary_predicate(ExprContext* ctx, const char*
min_bytes,
+ const char* max_bytes, bool&
need_filter) {
Expr* conjunct = ctx->root();
Expr* expr = conjunct->get_child(1);
if (expr == nullptr) {
@@ -433,9 +433,9 @@ void RowGroupReader::_eval_binary_predicate(ExprContext*
ctx, const char* min_by
}
}
-bool RowGroupReader::_determine_filter_row_group(const
std::vector<ExprContext*>& conjuncts,
- const std::string&
encoded_min,
- const std::string&
encoded_max) {
+bool ParquetReader::_determine_filter_min_max(const std::vector<ExprContext*>&
conjuncts,
+ const std::string& encoded_min,
+ const std::string& encoded_max) {
const char* min_bytes = encoded_min.data();
const char* max_bytes = encoded_max.data();
bool need_filter = false;
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 939500ce97..cb5dc1558b 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -67,9 +67,4 @@ Status parse_thrift_footer(FileReader* file,
std::shared_ptr<FileMetaData>& file
RETURN_IF_ERROR(file_metadata->init_schema());
return Status::OK();
}
-
-// Status parse_page_header() {
-// uint8_t* page_buf;
-//
-// }
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index a0a21b00ca..751780fbae 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -32,6 +32,7 @@ Status ColumnChunkReader::init() {
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
+ VLOG_DEBUG << "create _page_reader";
_page_reader = std::make_unique<PageReader>(_stream_reader, start_offset,
chunk_size);
if (_metadata.__isset.dictionary_page_offset) {
@@ -43,12 +44,13 @@ Status ColumnChunkReader::init() {
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec,
_block_compress_codec));
+ VLOG_DEBUG << "initColumnChunkReader finish";
return Status::OK();
}
Status ColumnChunkReader::next_page() {
- RETURN_IF_ERROR(_page_reader->next_page());
- _num_values = _page_reader->get_page_header()->data_page_header.num_values;
+ RETURN_IF_ERROR(_page_reader->next_page_header());
+ _remaining_num_values =
_page_reader->get_page_header()->data_page_header.num_values;
return Status::OK();
}
@@ -72,12 +74,12 @@ Status ColumnChunkReader::load_page_data() {
if (_max_rep_level > 0) {
RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
header.data_page_header.repetition_level_encoding,
- _max_rep_level, _num_values));
+ _max_rep_level,
_remaining_num_values));
}
if (_max_def_level > 0) {
RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
header.data_page_header.definition_level_encoding,
- _max_def_level, _num_values));
+ _max_def_level,
_remaining_num_values));
}
auto encoding = header.data_page_header.encoding;
@@ -85,6 +87,7 @@ Status ColumnChunkReader::load_page_data() {
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
encoding = tparquet::Encoding::RLE_DICTIONARY;
}
+
// Reuse page decoder
if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) {
_page_decoder = _decoders[static_cast<int>(encoding)].get();
@@ -104,7 +107,7 @@ Status ColumnChunkReader::load_page_data() {
Status ColumnChunkReader::_decode_dict_page() {
int64_t dict_offset = _metadata.dictionary_page_offset;
_page_reader->seek_to_page(dict_offset);
- _page_reader->next_page();
+ _page_reader->next_page_header();
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
// TODO(gaoxin): decode dictionary page
@@ -119,10 +122,10 @@ void ColumnChunkReader::_reserve_decompress_buf(size_t
size) {
}
Status ColumnChunkReader::skip_values(size_t num_values) {
- if (UNLIKELY(_num_values < num_values)) {
+ if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Skip too many values in current page");
}
- _num_values -= num_values;
+ _remaining_num_values -= num_values;
return _page_decoder->skip_values(num_values);
}
@@ -138,27 +141,27 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels,
size_t n) {
Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values) {
- if (UNLIKELY(_num_values < num_values)) {
+ if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
- _num_values -= num_values;
+ _remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
size_t num_values) {
- if (UNLIKELY(_num_values < num_values)) {
+ if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
- _num_values -= num_values;
+ _remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
- if (UNLIKELY(_num_values < num_values)) {
+ if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
- _num_values -= num_values;
+ _remaining_num_values -= num_values;
return _page_decoder->decode_values(slice, num_values);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index f8510d4b37..b248ba0a51 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -85,10 +85,10 @@ public:
// and initialize the repetition and definition level decoder for current
page data.
Status load_page_data();
// The remaining number of values in current page(including null values).
Decreased when reading or skipping.
- uint32_t num_values() const { return _num_values; };
+ uint32_t remaining_num_values() const { return _remaining_num_values; };
// null values are not analyzing from definition levels
// the caller should maintain the consistency after analyzing null values
from definition levels.
- void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
+ void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num;
};
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@@ -116,6 +116,7 @@ private:
FieldSchema* _field_schema;
level_t _max_rep_level;
level_t _max_def_level;
+ tparquet::LogicalType _parquet_logical_type;
BufferedStreamReader* _stream_reader;
// tparquet::ColumnChunk* _column_chunk;
@@ -127,7 +128,7 @@ private:
LevelDecoder _rep_level_decoder;
LevelDecoder _def_level_decoder;
- uint32_t _num_values = 0;
+ uint32_t _remaining_num_values = 0;
Slice _page_data;
std::unique_ptr<uint8_t[]> _decompress_buf;
size_t _decompress_buf_size = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 547dfba3bd..e7b189e40c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -19,50 +19,83 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
+#include <vec/columns/columns_number.h>
#include "schema_desc.h"
#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
-Status ScalarColumnReader::init(const FileReader* file, const FieldSchema*
field,
- const tparquet::ColumnChunk* chunk, const
TypeDescriptor& col_type,
- int64_t chunk_size) {
- // todo1: init column chunk reader
- // BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
- // _chunk_reader(&stream_reader, chunk, field);
- // _chunk_reader.init();
- return Status();
-}
-
-Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
- const FieldSchema* field, const
ParquetReadColumn& column,
- const TypeDescriptor& col_type,
+Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+ const ParquetReadColumn& column,
const tparquet::RowGroup& row_group,
- const ParquetColumnReader* reader) {
+ std::vector<RowRange>& row_ranges,
+ std::unique_ptr<ParquetColumnReader>&
reader) {
if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
return Status::Corruption("not supported type");
}
if (field->type.type == TYPE_ARRAY) {
return Status::Corruption("not supported array type yet");
} else {
+ VLOG_DEBUG << "field->physical_column_index: " <<
field->physical_column_index;
+ tparquet::ColumnChunk chunk =
row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
- RETURN_IF_ERROR(scalar_reader->init(file, field,
-
&row_group.columns[field->physical_column_index],
- col_type, chunk_size));
- reader = scalar_reader;
+ scalar_reader->init_column_metadata(chunk);
+ RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
+ reader.reset(scalar_reader);
}
return Status::OK();
}
-Status ScalarColumnReader::read_column_data(const tparquet::RowGroup&
row_group_meta,
- ColumnPtr* data) {
- // todo2: read data with chunk reader to load page data
- // while (_chunk_reader.has_next) {
- // _chunk_reader.next_page();
- // _chunk_reader.load_page_data();
- // }
- return Status();
+void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk&
chunk) {
+ auto chunk_meta = chunk.meta_data;
+ int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+ ? chunk_meta.dictionary_page_offset
+ : chunk_meta.data_page_offset;
+ size_t chunk_len = chunk_meta.total_compressed_size;
+ _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len,
chunk_meta));
+}
+
+void ParquetColumnReader::_skipped_pages() {}
+
+Status ScalarColumnReader::init(FileReader* file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
+ std::vector<RowRange>& row_ranges) {
+ BufferedFileStreamReader stream_reader(file, _metadata->start_offset(),
_metadata->size());
+ _row_ranges.reset(&row_ranges);
+ _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
+ _chunk_reader->init();
+ return Status::OK();
+}
+
+Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
+ size_t batch_size, size_t*
read_rows, bool* eof) {
+ if (_chunk_reader->remaining_num_values() <= 0) {
+ // seek to next page header
+ _chunk_reader->next_page();
+ if (_row_ranges->size() != 0) {
+ _skipped_pages();
+ }
+ // load data to decoder
+ _chunk_reader->load_page_data();
+ }
+ size_t read_values = _chunk_reader->remaining_num_values() < batch_size
+ ? _chunk_reader->remaining_num_values()
+ : batch_size;
+ *read_rows = read_values;
+ WhichDataType which_type(type);
+ switch (_metadata->t_metadata().type) {
+ case tparquet::Type::INT32: {
+ _chunk_reader->decode_values(doris_column, type, read_values);
+ return Status::OK();
+ }
+ case tparquet::Type::INT64: {
+ // todo: test int64
+ return Status::OK();
+ }
+ default:
+ return Status::Corruption("unsupported parquet data type");
+ }
+ return Status::OK();
}
void ScalarColumnReader::close() {}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 68a38607e6..696fbe5db0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -20,36 +20,78 @@
#include <gen_cpp/parquet_types.h>
#include "schema_desc.h"
+#include "vparquet_column_chunk_reader.h"
#include "vparquet_reader.h"
-//#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
+struct RowRange;
class ParquetReadColumn;
+class ParquetColumnMetadata {
+public:
+ ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length,
+ tparquet::ColumnMetaData metadata)
+ : _chunk_start_offset(chunk_start_offset),
+ _chunk_length(chunk_length),
+ _metadata(metadata) {};
+
+ ~ParquetColumnMetadata() = default;
+ int64_t start_offset() const { return _chunk_start_offset; };
+ int64_t size() const { return _chunk_length; };
+ tparquet::ColumnMetaData t_metadata() { return _metadata; };
+
+private:
+ int64_t _chunk_start_offset;
+ int64_t _chunk_length;
+ tparquet::ColumnMetaData _metadata;
+};
+
class ParquetColumnReader {
public:
ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
- virtual ~ParquetColumnReader() = 0;
- virtual Status read_column_data(const tparquet::RowGroup& row_group_meta,
ColumnPtr* data) = 0;
- static Status create(const FileReader* file, int64_t chunk_size, const
FieldSchema* field,
- const ParquetReadColumn& column, const
TypeDescriptor& col_type,
- const tparquet::RowGroup& row_group, const
ParquetColumnReader* reader);
+ virtual ~ParquetColumnReader() = default;
+ virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr&
type, size_t batch_size,
+ size_t* read_rows, bool* eof) = 0;
+ static Status create(FileReader* file, FieldSchema* field, const
ParquetReadColumn& column,
+ const tparquet::RowGroup& row_group,
std::vector<RowRange>& row_ranges,
+ std::unique_ptr<ParquetColumnReader>& reader);
+ void init_column_metadata(const tparquet::ColumnChunk& chunk);
virtual void close() = 0;
+protected:
+ void _skipped_pages();
+
protected:
const ParquetReadColumn& _column;
- // const ColumnChunkReader& _chunk_reader;
+ std::unique_ptr<ParquetColumnMetadata> _metadata;
+ std::unique_ptr<std::vector<RowRange>> _row_ranges;
};
class ScalarColumnReader : public ParquetColumnReader {
public:
ScalarColumnReader(const ParquetReadColumn& column) :
ParquetColumnReader(column) {};
~ScalarColumnReader() override = default;
- Status init(const FileReader* file, const FieldSchema* field,
- const tparquet::ColumnChunk* chunk, const TypeDescriptor&
col_type,
- int64_t chunk_size);
- Status read_column_data(const tparquet::RowGroup& row_group_meta,
ColumnPtr* data) override;
+ Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk*
chunk,
+ std::vector<RowRange>& row_ranges);
+ Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t
batch_size,
+ size_t* read_rows, bool* eof) override;
void close() override;
+
+private:
+ std::unique_ptr<ColumnChunkReader> _chunk_reader;
};
+
+//class ArrayColumnReader : public ParquetColumnReader {
+//public:
+// ArrayColumnReader(const ParquetReadColumn& column) :
ParquetColumnReader(column) {};
+// ~ArrayColumnReader() override = default;
+// Status init(FileReader* file, FieldSchema* field,
+// tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
+// int64_t chunk_size);
+// Status read_column_data(ColumnPtr* data) override;
+// void close() override;
+//private:
+// std::unique_ptr<ColumnChunkReader> _chunk_reader;
+//};
}; // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
index 445ce76318..4e413ec9e9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
@@ -39,7 +39,7 @@ Status FileMetaData::init_schema() {
return Status();
}
-const tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
+tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
return _metadata;
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 53d08fa855..1f4727242d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -27,7 +27,7 @@ public:
FileMetaData(tparquet::FileMetaData& metadata);
~FileMetaData() = default;
Status init_schema();
- const tparquet::FileMetaData& to_thrift_metadata();
+ tparquet::FileMetaData& to_thrift_metadata();
int32_t num_row_groups() const { return _num_groups; }
int32_t num_columns() const { return _num_columns; };
int32_t num_rows() const { return _num_rows; };
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 25b7819e8f..751e43863a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -24,235 +24,61 @@
namespace doris::vectorized {
RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
- const std::shared_ptr<FileMetaData>&
file_metadata,
const std::vector<ParquetReadColumn>&
read_columns,
- const std::map<std::string, int>& map_column,
- const std::vector<ExprContext*>& conjunct_ctxs)
+ const int32_t row_group_id, tparquet::RowGroup&
row_group)
: _file_reader(file_reader),
- _file_metadata(file_metadata),
_read_columns(read_columns),
- _map_column(map_column),
- _conjunct_ctxs(conjunct_ctxs),
- _current_row_group(-1) {}
+ _row_group_id(row_group_id),
+ _row_group_meta(row_group),
+ _total_rows(row_group.num_rows) {}
RowGroupReader::~RowGroupReader() {
- for (auto& column_reader : _column_readers) {
- auto reader = column_reader.second;
- reader->close();
- delete reader;
- reader = nullptr;
- }
_column_readers.clear();
}
-Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t
split_start_offset,
- int64_t split_size) {
- _tuple_desc = tuple_desc;
- _split_start_offset = split_start_offset;
- _split_size = split_size;
- _init_conjuncts(tuple_desc, _conjunct_ctxs);
- RETURN_IF_ERROR(_init_column_readers());
+Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges) {
+ VLOG_DEBUG << "Row group id: " << _row_group_id;
+ RETURN_IF_ERROR(_init_column_readers(schema, row_ranges));
return Status::OK();
}
-void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
- const std::vector<ExprContext*>&
conjunct_ctxs) {
- if (tuple_desc->slots().empty()) {
- return;
- }
- for (auto& read_col : _read_columns) {
- _parquet_column_ids.emplace(read_col.parquet_column_id);
- }
-
- for (int i = 0; i < tuple_desc->slots().size(); i++) {
- auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
- if (col_iter == _map_column.end()) {
- continue;
- }
- int parquet_col_id = col_iter->second;
- if (_parquet_column_ids.end() ==
_parquet_column_ids.find(parquet_col_id)) {
- continue;
- }
- for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
- Expr* conjunct = conjunct_ctxs[conj_idx]->root();
- if (conjunct->get_num_children() == 0) {
- continue;
- }
- Expr* raw_slot = conjunct->get_child(0);
- if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
- continue;
- }
- SlotRef* slot_ref = (SlotRef*)raw_slot;
- SlotId conjunct_slot_id = slot_ref->slot_id();
- if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
- // Get conjuncts by conjunct_slot_id
- auto iter = _slot_conjuncts.find(conjunct_slot_id);
- if (_slot_conjuncts.end() == iter) {
- std::vector<ExprContext*> conjuncts;
- conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
- _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id,
conjuncts));
- } else {
- std::vector<ExprContext*> conjuncts = iter->second;
- conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
- }
- }
- }
- }
-}
-
-Status RowGroupReader::_init_column_readers() {
+Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
+ std::vector<RowRange>& row_ranges)
{
for (auto& read_col : _read_columns) {
SlotDescriptor* slot_desc = read_col.slot_desc;
- FieldDescriptor schema = _file_metadata->schema();
TypeDescriptor col_type = slot_desc->type();
- const auto& field = schema.get_column(slot_desc->col_name());
- const tparquet::RowGroup row_group =
-
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
- ParquetColumnReader* reader = nullptr;
- RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader,
MAX_PARQUET_BLOCK_SIZE, field,
- read_col,
slot_desc->type(), row_group,
- reader));
+ auto field =
const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
+ VLOG_DEBUG << "field: " << field->debug_string();
+ std::unique_ptr<ParquetColumnReader> reader;
+ RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field,
read_col, _row_group_meta,
+ row_ranges, reader));
if (reader == nullptr) {
+ VLOG_DEBUG << "Init row group reader failed";
return Status::Corruption("Init row group reader failed");
}
- _column_readers[slot_desc->id()] = reader;
+ _column_readers[slot_desc->id()] = std::move(reader);
}
return Status::OK();
}
-Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id)
{
- // get ColumnWithTypeAndName from src_block
+Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool*
_batch_eof) {
+ if (_read_rows >= _total_rows) {
+ *_batch_eof = true;
+ }
for (auto& read_col : _read_columns) {
- const tparquet::RowGroup row_group =
-
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
- auto& column_with_type_and_name =
block->get_by_name(read_col.slot_desc->col_name());
-
RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data(
- row_group, &column_with_type_and_name.column));
- VLOG_DEBUG << column_with_type_and_name.name;
+ auto slot_desc = read_col.slot_desc;
+ auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
+ auto column_ptr = column_with_type_and_name.column;
+ auto column_type = column_with_type_and_name.type;
+ size_t batch_read_rows = 0;
+ RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
+ column_ptr, column_type, batch_size, &batch_read_rows,
_batch_eof));
+ _read_rows += batch_read_rows;
+ VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
+ VLOG_DEBUG << "read rows in column: " << batch_read_rows;
}
// use data fill utils read column data to column ptr
return Status::OK();
}
-Status RowGroupReader::get_next_row_group(const int32_t* group_id) {
- int32_t total_group = _file_metadata->num_row_groups();
- if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size <
0) {
- return Status::EndOfFile("No row group need read");
- }
- while (_current_row_group < total_group) {
- _current_row_group++;
- const tparquet::RowGroup& row_group =
-
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
- if (!_is_misaligned_range_group(row_group)) {
- continue;
- }
- bool filter_group = false;
- RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs,
&filter_group));
- if (!filter_group) {
- group_id = &_current_row_group;
- break;
- }
- }
- return Status::OK();
-}
-
-bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup&
row_group) {
- int64_t start_offset =
_get_column_start_offset(row_group.columns[0].meta_data);
-
- auto last_column = row_group.columns[row_group.columns.size() -
1].meta_data;
- int64_t end_offset = _get_column_start_offset(last_column) +
last_column.total_compressed_size;
-
- int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
- if (!(row_group_mid >= _split_start_offset &&
- row_group_mid < _split_start_offset + _split_size)) {
- return true;
- }
- return false;
-}
-
-Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup&
row_group,
- const
std::vector<ExprContext*>& conjunct_ctxs,
- bool* filter_group) {
- _process_column_stat_filter(row_group, conjunct_ctxs, filter_group);
- _init_chunk_dicts();
- RETURN_IF_ERROR(_process_dict_filter(filter_group));
- _init_bloom_filter();
- RETURN_IF_ERROR(_process_bloom_filter(filter_group));
- return Status::OK();
-}
-
-Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup&
row_group,
- const
std::vector<ExprContext*>& conjunct_ctxs,
- bool* filter_group) {
- int total_group = _file_metadata->num_row_groups();
- // It will not filter if head_group_offset equals tail_group_offset
- int64_t total_rows = 0;
- int64_t total_bytes = 0;
- for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
- total_rows += row_group.num_rows;
- total_bytes += row_group.total_byte_size;
- for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size();
slot_id++) {
- const std::string& col_name =
_tuple_desc->slots()[slot_id]->col_name();
- auto col_iter = _map_column.find(col_name);
- if (col_iter == _map_column.end()) {
- continue;
- }
- int parquet_col_id = col_iter->second;
- if (_parquet_column_ids.end() ==
_parquet_column_ids.find(parquet_col_id)) {
- // Column not exist in parquet file
- continue;
- }
- auto slot_iter = _slot_conjuncts.find(slot_id);
- if (slot_iter == _slot_conjuncts.end()) {
- continue;
- }
- auto statistic =
row_group.columns[parquet_col_id].meta_data.statistics;
- if (!statistic.__isset.max || !statistic.__isset.min) {
- continue;
- }
- // Min-max of statistic is plain-encoded value
- *filter_group =
- _determine_filter_row_group(slot_iter->second,
statistic.min, statistic.max);
- if (*filter_group) {
- _filtered_num_row_groups++;
- VLOG_DEBUG << "Filter row group id: " << row_group_id;
- break;
- }
- }
- }
- VLOG_DEBUG << "DEBUG total_rows: " << total_rows;
- VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes;
- VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string()
- << ", Num of read row group: " << total_group
- << ", and num of skip row group: " << _filtered_num_row_groups;
- return Status::OK();
-}
-
-void RowGroupReader::_init_chunk_dicts() {}
-
-Status RowGroupReader::_process_dict_filter(bool* filter_group) {
- return Status();
-}
-
-void RowGroupReader::_init_bloom_filter() {}
-
-Status RowGroupReader::_process_bloom_filter(bool* filter_group) {
- RETURN_IF_ERROR(_file_reader->seek(0));
- return Status();
-}
-
-int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup&
row_group) {
- if (row_group.__isset.file_offset) {
- return row_group.file_offset;
- }
- return row_group.columns[0].meta_data.data_page_offset;
-}
-
-int64_t RowGroupReader::_get_column_start_offset(const
tparquet::ColumnMetaData& column) {
- if (column.__isset.dictionary_page_offset) {
- DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
- return column.dictionary_page_offset;
- }
- return column.data_page_offset;
-}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index b69852f124..ea9eeed342 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -24,87 +24,30 @@
#include "vparquet_file_metadata.h"
#include "vparquet_reader.h"
-#define MAX_PARQUET_BLOCK_SIZE 1024
-
namespace doris::vectorized {
class ParquetReadColumn;
class ParquetColumnReader;
+struct RowRange;
+
class RowGroupReader {
public:
RowGroupReader(doris::FileReader* file_reader,
- const std::shared_ptr<FileMetaData>& file_metadata,
- const std::vector<ParquetReadColumn>& read_columns,
- const std::map<std::string, int>& map_column,
- const std::vector<ExprContext*>& conjunct_ctxs);
+ const std::vector<ParquetReadColumn>& read_columns, const
int32_t _row_group_id,
+ tparquet::RowGroup& row_group);
~RowGroupReader();
- Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset,
int64_t split_size);
- Status get_next_row_group(const int32_t* group_id);
- Status fill_columns_data(Block* block, const int32_t group_id);
+ Status init(const FieldDescriptor& schema, std::vector<RowRange>&
row_ranges);
+ Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
private:
- bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
-
- Status _process_column_stat_filter(const tparquet::RowGroup& row_group,
- const std::vector<ExprContext*>&
conjunct_ctxs,
- bool* filter_group);
-
- void _init_conjuncts(const TupleDescriptor* tuple_desc,
- const std::vector<ExprContext*>& conjunct_ctxs);
-
- Status _init_column_readers();
-
- Status _process_row_group_filter(const tparquet::RowGroup& row_group,
- const std::vector<ExprContext*>&
conjunct_ctxs,
- bool* filter_group);
-
- void _init_chunk_dicts();
-
- Status _process_dict_filter(bool* filter_group);
-
- void _init_bloom_filter();
-
- Status _process_bloom_filter(bool* filter_group);
-
- int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group);
- int64_t _get_column_start_offset(const tparquet::ColumnMetaData&
column_init_column_readers);
-
- bool _determine_filter_row_group(const std::vector<ExprContext*>&
conjuncts,
- const std::string& encoded_min,
- const std::string& encoded_max);
-
- void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const
char* max_bytes,
- bool& need_filter);
-
- void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const
char* max_bytes,
- bool& need_filter);
-
- bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*>
in_pred_values,
- const char* min_bytes, const char* max_bytes);
-
- bool _eval_eq(PrimitiveType conjunct_type, void* value, const char*
min_bytes,
- const char* max_bytes);
-
- bool _eval_gt(PrimitiveType conjunct_type, void* value, const char*
max_bytes);
-
- bool _eval_ge(PrimitiveType conjunct_type, void* value, const char*
max_bytes);
-
- bool _eval_lt(PrimitiveType conjunct_type, void* value, const char*
min_bytes);
-
- bool _eval_le(PrimitiveType conjunct_type, void* value, const char*
min_bytes);
+ Status _init_column_readers(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges);
private:
doris::FileReader* _file_reader;
- const std::shared_ptr<FileMetaData>& _file_metadata;
- std::unordered_map<int32_t, ParquetColumnReader*> _column_readers;
- const TupleDescriptor* _tuple_desc; // get all slot info
+ std::unordered_map<int32_t, std::unique_ptr<ParquetColumnReader>>
_column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
- const std::map<std::string, int>& _map_column;
- std::unordered_set<int> _parquet_column_ids;
- const std::vector<ExprContext*>& _conjunct_ctxs;
- std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
- int64_t _split_start_offset;
- int64_t _split_size;
- int32_t _current_row_group;
- int32_t _filtered_num_row_groups = 0;
+ const int32_t _row_group_id;
+ tparquet::RowGroup& _row_group_meta;
+ int64_t _read_rows = 0;
+ int64_t _total_rows;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index 6365ec2163..40df65ace6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -21,22 +21,22 @@
namespace doris::vectorized {
-PageIndex::~PageIndex() {
- if (_column_index != nullptr) {
- delete _column_index;
- _column_index = nullptr;
+Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
+ int total_rows_of_group, int
page_idx,
+ RowRange* row_range) {
+ const auto& page_locations = offset_index.page_locations;
+ DCHECK_LT(page_idx, page_locations.size());
+ row_range->first_row = page_locations[page_idx].first_row_index;
+ if (page_idx == page_locations.size() - 1) {
+ row_range->last_row = total_rows_of_group - 1;
+ } else {
+ row_range->last_row = page_locations[page_idx + 1].first_row_index - 1;
}
- if (_offset_index != nullptr) {
- delete _offset_index;
- _offset_index = nullptr;
- }
-}
-
-Status PageIndex::get_row_range_for_page() {
- return Status();
+ return Status::OK();
}
-Status PageIndex::collect_skipped_page_range() {
+Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*>
conjuncts,
+ std::vector<int> page_range) {
return Status();
}
@@ -67,20 +67,21 @@ bool PageIndex::check_and_get_page_index_ranges(const
std::vector<tparquet::Colu
return has_page_index;
}
-Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff) {
+Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff,
+ tparquet::ColumnIndex* column_index) {
int64_t buffer_offset = chunk.column_index_offset - _column_index_start;
uint32_t length = chunk.column_index_length;
DCHECK_LE(buffer_offset + length, _column_index_size);
- RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length,
true, _column_index));
+ RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length,
true, column_index));
return Status::OK();
}
Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff,
- int64_t buffer_size) {
+ int64_t buffer_size,
tparquet::OffsetIndex* offset_index) {
int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start +
_column_index_size;
uint32_t length = chunk.offset_index_length;
DCHECK_LE(buffer_offset + length, buffer_size);
- RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length,
true, _offset_index));
+ RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length,
true, offset_index));
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h
b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index a26074ab0d..5894a4e8d6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -19,30 +19,32 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
+#include "exprs/expr_context.h"
+
namespace doris::vectorized {
+class ParquetReader;
+struct RowRange;
class PageIndex {
public:
PageIndex() = default;
- ~PageIndex();
- Status get_row_range_for_page();
- Status collect_skipped_page_range();
+ ~PageIndex() = default;
+ Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int
total_rows_of_group,
+ int page_idx, RowRange* row_range);
+ Status collect_skipped_page_range(std::vector<ExprContext*> conjuncts,
+ std::vector<int> page_range);
bool check_and_get_page_index_ranges(const
std::vector<tparquet::ColumnChunk>& columns);
- Status parse_column_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff);
+ Status parse_column_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff,
+ tparquet::ColumnIndex* _column_index);
Status parse_offset_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff,
- int64_t buffer_size);
+ int64_t buffer_size, tparquet::OffsetIndex*
_offset_index);
-private:
private:
friend class ParquetReader;
int64_t _column_index_start;
int64_t _column_index_size;
int64_t _offset_index_start;
int64_t _offset_index_size;
-
- tparquet::OffsetIndex* _offset_index;
- tparquet::ColumnIndex* _column_index;
- // row range define
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index f554be169e..94a291f40e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -28,7 +28,7 @@ static constexpr size_t initPageHeaderSize = 1024;
PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t
length)
: _reader(reader), _start_offset(offset), _end_offset(offset + length)
{}
-Status PageReader::next_page() {
+Status PageReader::next_page_header() {
if (_offset < _start_offset || _offset >= _end_offset) {
return Status::IOError("Out-of-bounds Access");
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index cf95812ead..256ddd13d1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -34,7 +34,7 @@ public:
bool has_next_page() const { return _offset < _end_offset; }
- Status next_page();
+ Status next_page_header();
Status skip_page();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 815cba3c15..b16df6b557 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -21,13 +21,14 @@
namespace doris::vectorized {
ParquetReader::ParquetReader(FileReader* file_reader, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size)
+ size_t batch_size, int64_t range_start_offset,
int64_t range_size)
: _num_of_columns_from_file(num_of_columns_from_file),
+ _batch_size(batch_size),
_range_start_offset(range_start_offset),
_range_size(range_size) {
_file_reader = file_reader;
_total_groups = 0;
- // _current_group = 0;
+ _current_row_group_id = 0;
// _statistics = std::make_shared<Statistics>();
}
@@ -36,6 +37,10 @@ ParquetReader::~ParquetReader() {
}
void ParquetReader::close() {
+ for (auto& conjuncts : _slot_conjuncts) {
+ conjuncts.second.clear();
+ }
+ _slot_conjuncts.clear();
if (_file_reader != nullptr) {
_file_reader->close();
delete _file_reader;
@@ -45,26 +50,26 @@ void ParquetReader::close() {
Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>&
tuple_slot_descs,
- const std::vector<ExprContext*>&
conjunct_ctxs,
+ std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) {
_file_reader->open();
+ _conjunct_ctxs.reset(&conjunct_ctxs);
RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
- auto metadata = _file_metadata->to_thrift_metadata();
-
- _total_groups = metadata.row_groups.size();
+ _t_metadata.reset(&_file_metadata->to_thrift_metadata());
+ _total_groups = _file_metadata->num_row_groups();
if (_total_groups == 0) {
return Status::EndOfFile("Empty Parquet File");
}
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
- LOG(WARNING) << schema_desc.debug_string();
+ // for test
+ VLOG_DEBUG << schema_desc.debug_string();
// Get the Column Reader for the boolean column
_map_column.emplace(schema_desc.get_column(i)->name, i);
}
- LOG(WARNING) << "";
RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
RETURN_IF_ERROR(
- _init_row_group_reader(tuple_desc, _range_start_offset,
_range_size, conjunct_ctxs));
+ _init_row_group_readers(tuple_desc, _range_start_offset,
_range_size, conjunct_ctxs));
return Status::OK();
}
@@ -81,7 +86,7 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
} else {
std::stringstream str_error;
str_error << "Invalid Column Name:" << slot_desc->col_name();
- LOG(WARNING) << str_error.str();
+ VLOG_DEBUG << str_error.str();
return Status::InvalidArgument(str_error.str());
}
ParquetReadColumn column;
@@ -90,63 +95,231 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
auto physical_type =
_file_metadata->schema().get_column(parquet_col_id)->physical_type;
column.parquet_type = physical_type;
_read_columns.emplace_back(column);
+ VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
}
return Status::OK();
}
-Status ParquetReader::read_next_batch(Block* block) {
- int32_t group_id = 0;
- RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id));
- auto metadata = _file_metadata->to_thrift_metadata();
- auto column_chunks = metadata.row_groups[group_id].columns;
- if (_has_page_index(column_chunks)) {
- Status st = _process_page_index(column_chunks);
- if (st.ok()) {
- // todo: process filter page
- return Status::OK();
- } else {
- // todo: record profile
- LOG(WARNING) << "";
+Status ParquetReader::read_next_batch(Block* block, bool* eof) {
+ DCHECK(_total_groups == _row_group_readers.size());
+ if (_total_groups == 0) {
+ *eof = true;
+ }
+ bool _batch_eof = false;
+ auto row_group_reader = _row_group_readers[_current_row_group_id];
+ RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size,
&_batch_eof));
+ if (_batch_eof) {
+ _current_row_group_id++;
+ if (_current_row_group_id > _total_groups) {
+ *eof = true;
+ }
+ }
+ return Status::OK();
+}
+
+Status ParquetReader::_init_row_group_readers(const TupleDescriptor*
tuple_desc,
+ int64_t range_start_offset,
int64_t range_size,
+ const std::vector<ExprContext*>&
conjunct_ctxs) {
+ std::vector<int32_t> read_row_groups;
+ RETURN_IF_ERROR(_filter_row_groups(&read_row_groups));
+ _init_conjuncts(tuple_desc, conjunct_ctxs);
+ for (auto row_group_id : read_row_groups) {
+ VLOG_DEBUG << "_has_page_index";
+ auto row_group = _t_metadata->row_groups[row_group_id];
+ auto column_chunks = row_group.columns;
+ std::vector<RowRange> skipped_row_ranges;
+ if (_has_page_index(column_chunks)) {
+ VLOG_DEBUG << "_process_page_index";
+ RETURN_IF_ERROR(_process_page_index(row_group,
skipped_row_ranges));
}
+ std::shared_ptr<RowGroupReader> row_group_reader;
+ row_group_reader.reset(
+ new RowGroupReader(_file_reader, _read_columns, row_group_id,
row_group));
+ // todo: can filter row with candidate ranges rather than skipped
ranges
+ RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(),
skipped_row_ranges));
+ _row_group_readers.emplace_back(row_group_reader);
}
- // metadata has been processed, fill parquet data to block
- // block is the batch data of a row group. a row group has N batch
- // push to scanner queue
- _fill_block_data(block, group_id);
+ VLOG_DEBUG << "_init_row_group_reader finished";
return Status::OK();
}
-void ParquetReader::_fill_block_data(Block* block, int group_id) {
- // make and init src block here
- // read column chunk
- _row_group_reader->fill_columns_data(block, group_id);
+void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
+ const std::vector<ExprContext*>&
conjunct_ctxs) {
+ if (tuple_desc->slots().empty()) {
+ return;
+ }
+ std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(),
_include_column_ids.end());
+ for (int i = 0; i < tuple_desc->slots().size(); i++) {
+ auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
+ if (col_iter == _map_column.end()) {
+ continue;
+ }
+ int parquet_col_id = col_iter->second;
+ if (parquet_col_ids.end() == parquet_col_ids.find(parquet_col_id)) {
+ continue;
+ }
+ for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
+ Expr* conjunct = conjunct_ctxs[conj_idx]->root();
+ if (conjunct->get_num_children() == 0) {
+ continue;
+ }
+ Expr* raw_slot = conjunct->get_child(0);
+ if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
+ continue;
+ }
+ SlotRef* slot_ref = (SlotRef*)raw_slot;
+ SlotId conjunct_slot_id = slot_ref->slot_id();
+ if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+ // Get conjuncts by conjunct_slot_id
+ auto iter = _slot_conjuncts.find(conjunct_slot_id);
+ if (_slot_conjuncts.end() == iter) {
+ std::vector<ExprContext*> conjuncts;
+ conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
+ _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id,
conjuncts));
+ } else {
+ std::vector<ExprContext*> conjuncts = iter->second;
+ conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
+ }
+ }
+ }
+ }
}
-Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc,
- int64_t range_start_offset,
int64_t range_size,
- const std::vector<ExprContext*>&
conjunct_ctxs) {
- // todo: extract as create()
- _row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata,
_read_columns,
- _map_column, conjunct_ctxs));
- RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset,
range_size));
+Status ParquetReader::_filter_row_groups(std::vector<int32_t>*
read_row_group_ids) {
+ if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size <
0) {
+ return Status::EndOfFile("No row group need read");
+ }
+ int32_t row_group_idx = -1;
+ while (row_group_idx < _total_groups) {
+ row_group_idx++;
+ const tparquet::RowGroup& row_group =
_t_metadata->row_groups[row_group_idx];
+ if (_is_misaligned_range_group(row_group)) {
+ continue;
+ }
+ bool filter_group = false;
+ RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
+ if (!filter_group) {
+ read_row_group_ids->emplace_back(row_group_idx);
+ break;
+ }
+ }
return Status::OK();
}
+bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup&
row_group) {
+ int64_t start_offset =
_get_column_start_offset(row_group.columns[0].meta_data);
+
+ auto last_column = row_group.columns[row_group.columns.size() -
1].meta_data;
+ int64_t end_offset = _get_column_start_offset(last_column) +
last_column.total_compressed_size;
+
+ int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
+ if (!(row_group_mid >= _range_start_offset &&
+ row_group_mid < _range_start_offset + _range_size)) {
+ return true;
+ }
+ return false;
+}
+
bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>
columns) {
_page_index.reset(new PageIndex());
return _page_index->check_and_get_page_index_ranges(columns);
}
-Status ParquetReader::_process_page_index(std::vector<tparquet::ColumnChunk>
columns) {
+Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
+ std::vector<RowRange>&
skipped_row_ranges) {
int64_t buffer_size = _page_index->_column_index_size +
_page_index->_offset_index_size;
- uint8_t buff[buffer_size];
for (auto col_id : _include_column_ids) {
- auto chunk = columns[col_id];
- RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff));
- // todo: use page index filter min/max val
- RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff,
buffer_size));
- // todo: calculate row range
+ uint8_t buff[buffer_size];
+ auto chunk = row_group.columns[col_id];
+ tparquet::ColumnIndex column_index;
+ RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff,
&column_index));
+ VLOG_DEBUG << "_column_index_size : " <<
_page_index->_column_index_size;
+ VLOG_DEBUG << "_page_index 0 max_values : " <<
column_index.max_values[0];
+ const int num_of_page = column_index.null_pages.size();
+ if (num_of_page <= 1) {
+ break;
+ }
+ auto conjunct_iter = _slot_conjuncts.find(col_id);
+ if (_slot_conjuncts.end() == conjunct_iter) {
+ continue;
+ }
+ auto conjuncts = conjunct_iter->second;
+ std::vector<int> candidate_page_range;
+ _page_index->collect_skipped_page_range(conjuncts,
candidate_page_range);
+ tparquet::OffsetIndex offset_index;
+ RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff,
buffer_size, &offset_index));
+ VLOG_DEBUG << "page_locations size : " <<
offset_index.page_locations.size();
+ for (int page_id : candidate_page_range) {
+ RowRange skipped_row_range;
+ _page_index->create_skipped_row_range(offset_index,
row_group.num_rows, page_id,
+ &skipped_row_range);
+ skipped_row_ranges.emplace_back(skipped_row_range);
+ }
+ }
+ return Status::OK();
+}
+
+Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup&
row_group,
+ bool* filter_group) {
+ _process_column_stat_filter(row_group.columns, filter_group);
+ _init_chunk_dicts();
+ RETURN_IF_ERROR(_process_dict_filter(filter_group));
+ _init_bloom_filter();
+ RETURN_IF_ERROR(_process_bloom_filter(filter_group));
+ return Status::OK();
+}
+
+Status ParquetReader::_process_column_stat_filter(const
std::vector<tparquet::ColumnChunk>& columns,
+ bool* filter_group) {
+ // It will not filter if head_group_offset equals tail_group_offset
+ std::unordered_set<int> _parquet_column_ids(_include_column_ids.begin(),
+ _include_column_ids.end());
+ for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++)
{
+ auto slot_iter = _slot_conjuncts.find(slot_id);
+ if (slot_iter == _slot_conjuncts.end()) {
+ continue;
+ }
+ const std::string& col_name =
_tuple_desc->slots()[slot_id]->col_name();
+ auto col_iter = _map_column.find(col_name);
+ if (col_iter == _map_column.end()) {
+ continue;
+ }
+ int parquet_col_id = col_iter->second;
+ if (_parquet_column_ids.end() ==
_parquet_column_ids.find(parquet_col_id)) {
+ // Column not exist in parquet file
+ continue;
+ }
+ auto statistic = columns[parquet_col_id].meta_data.statistics;
+ if (!statistic.__isset.max || !statistic.__isset.min) {
+ continue;
+ }
+ // Min-max of statistic is plain-encoded value
+ *filter_group = _determine_filter_min_max(slot_iter->second,
statistic.min, statistic.max);
+ if (*filter_group) {
+ break;
+ }
}
return Status::OK();
}
+
+void ParquetReader::_init_chunk_dicts() {}
+
+Status ParquetReader::_process_dict_filter(bool* filter_group) {
+ return Status();
+}
+
+void ParquetReader::_init_bloom_filter() {}
+
+Status ParquetReader::_process_bloom_filter(bool* filter_group) {
+ RETURN_IF_ERROR(_file_reader->seek(0));
+ return Status();
+}
+
+int64_t ParquetReader::_get_column_start_offset(const
tparquet::ColumnMetaData& column) {
+ if (column.__isset.dictionary_page_offset) {
+ DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
+ return column.dictionary_page_offset;
+ }
+ return column.data_page_offset;
+}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index a4cf4e70e2..c1d0ec4247 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -42,6 +42,12 @@ namespace doris::vectorized {
// int64_t total_bytes = 0;
// };
class RowGroupReader;
+class PageIndex;
+
+struct RowRange {
+ int64_t first_row;
+ int64_t last_row;
+};
class ParquetReadColumn {
public:
@@ -58,49 +64,73 @@ private:
class ParquetReader {
public:
- ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
+ ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
size_t batch_size,
int64_t range_start_offset, int64_t range_size);
~ParquetReader();
Status init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
- const std::vector<ExprContext*>& conjunct_ctxs, const
std::string& timezone);
-
- Status read_next_batch(Block* block);
+ std::vector<ExprContext*>& conjunct_ctxs, const
std::string& timezone);
- bool has_next() const { return !_batch_eof; };
+ Status read_next_batch(Block* block, bool* eof);
- // std::shared_ptr<Statistics>& statistics() { return _statistics; }
+ // std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
int64_t size() const { return _file_reader->size(); }
private:
Status _init_read_columns(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
- Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t
range_start_offset,
- int64_t range_size,
- const std::vector<ExprContext*>&
conjunct_ctxs);
- void _fill_block_data(Block* block, int group_id);
+ Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t
range_start_offset,
+ int64_t range_size,
+ const std::vector<ExprContext*>&
conjunct_ctxs);
+ void _init_conjuncts(const TupleDescriptor* tuple_desc,
+ const std::vector<ExprContext*>& conjunct_ctxs);
+ // Page Index Filter
bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
- Status _process_page_index(std::vector<tparquet::ColumnChunk> columns);
+ Status _process_page_index(tparquet::RowGroup& row_group,
+ std::vector<RowRange>& skipped_row_ranges);
+
+ // Row Group Filter
+ bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
+ Status _process_column_stat_filter(const
std::vector<tparquet::ColumnChunk>& column_meta,
+ bool* filter_group);
+ Status _process_row_group_filter(const tparquet::RowGroup& row_group,
bool* filter_group);
+ void _init_chunk_dicts();
+ Status _process_dict_filter(bool* filter_group);
+ void _init_bloom_filter();
+ Status _process_bloom_filter(bool* filter_group);
+ Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids);
+ int64_t _get_column_start_offset(const tparquet::ColumnMetaData&
column_init_column_readers);
+ bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
+ const std::string& encoded_min, const
std::string& encoded_max);
+ void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const
char* max_bytes,
+ bool& need_filter);
+ void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const
char* max_bytes,
+ bool& need_filter);
private:
FileReader* _file_reader;
std::shared_ptr<FileMetaData> _file_metadata;
- std::shared_ptr<RowGroupReader> _row_group_reader;
+ std::unique_ptr<tparquet::FileMetaData> _t_metadata;
std::shared_ptr<PageIndex> _page_index;
- int _total_groups; // num of groups(stripes) of a parquet(orc) file
- // int _current_group; // current group(stripe)
+ std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
+ int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
+ int32_t _current_row_group_id;
// std::shared_ptr<Statistics> _statistics;
const int32_t _num_of_columns_from_file;
-
std::map<std::string, int> _map_column; // column-name <---> column-index
- std::vector<int> _include_column_ids; // columns that need to get from
file
+ std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs;
+ std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
+ std::vector<int> _include_column_ids; // columns that need to get from file
std::vector<ParquetReadColumn> _read_columns;
+ bool* _file_eof;
// parquet file reader object
- bool* _batch_eof;
+ size_t _batch_size;
int64_t _range_start_offset;
int64_t _range_size;
+
+ const TupleDescriptor* _tuple_desc; // get all slot info
};
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 95df8bd9a2..c334b105ed 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -147,7 +147,7 @@ static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk*
// load page data into underlying container
chunk_reader.load_page_data();
// decode page data
- return chunk_reader.decode_values(doris_column, data_type,
chunk_reader.num_values());
+ return chunk_reader.decode_values(doris_column, data_type,
chunk_reader.remaining_num_values());
}
static void create_block(std::unique_ptr<vectorized::Block>& block) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]