This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7c0bcbdca1 [enhance](parquet-reader) cache file meta of parquet to
speed up query (#18074)
7c0bcbdca1 is described below
commit 7c0bcbdca180a4da4433bfe20cb6970c9b947030
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Mar 25 23:22:57 2023 +0800
[enhance](parquet-reader) cache file meta of parquet to speed up query
(#18074)
Problem:
1. FE will split the parquet file into split. So a file can have several
splits.
2. BE will scan each split, read the footer of the parquet file.
3. If 2 splits belongs to a same parquet file, the footer of this file will
be read twice.
This PR mainly changes:
1. Use kv cache to cache the footer of parquet file.
2. The kv cache is belong to a scan node, so all parquet reader belong to
this scan node will share same kv cache.
3. In cache, the key is "meta_file_path", the value is parsed thrift footer.
The KV Cache is sharded into mutlti sub cache.
So that different file can use different sub cache, avoid blocking each
other
In my test, a query with 26 splits can reduce the footer parse time from 4s
-> 1s
---
be/src/io/fs/buffered_reader.h | 3 ++
be/src/runtime/memory/thread_mem_tracker_mgr.h | 1 -
be/src/vec/exec/format/format_common.h | 34 +++++++++++++++-
.../vec/exec/format/parquet/parquet_thrift_util.h | 7 ++--
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 47 +++++++++++++++++++---
be/src/vec/exec/format/parquet/vparquet_reader.h | 18 ++++++++-
be/src/vec/exec/format/table/iceberg_reader.cpp | 6 +--
be/src/vec/exec/format/table/iceberg_reader.h | 6 ++-
be/src/vec/exec/scan/new_file_scan_node.cpp | 6 ++-
be/src/vec/exec/scan/new_file_scan_node.h | 7 +++-
be/src/vec/exec/scan/vfile_scanner.cpp | 13 ++++--
be/src/vec/exec/scan/vfile_scanner.h | 7 ++--
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 20 +++++----
.../planner/external/ExternalFileScanNode.java | 3 ++
14 files changed, 142 insertions(+), 36 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 15f2f524d7..8b06721c1d 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -53,6 +53,8 @@ public:
virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
+ // return the file path
+ virtual std::string path() = 0;
protected:
Statistics _statistics;
@@ -66,6 +68,7 @@ public:
Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t
bytes_to_read) override;
Status read_bytes(Slice& slice, uint64_t offset) override;
+ std::string path() override { return _file->path(); }
private:
std::unique_ptr<uint8_t[]> _buf;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 363d2e9361..04ec86f7f8 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -19,7 +19,6 @@
#include <bthread/bthread.h>
#include <fmt/format.h>
-#include <parallel_hashmap/phmap.h>
#include "gutil/macros.h"
#include "runtime/memory/mem_tracker.h"
diff --git a/be/src/vec/exec/format/format_common.h
b/be/src/vec/exec/format/format_common.h
index 18277c1ed4..42d31b3aa9 100644
--- a/be/src/vec/exec/format/format_common.h
+++ b/be/src/vec/exec/format/format_common.h
@@ -84,13 +84,13 @@ public:
}
template <class T>
- T* get(const KType& key, const std::function<T*()> createIfNotExists) {
+ T* get(const KType& key, const std::function<T*()> create_func) {
std::lock_guard<std::mutex> lock(_lock);
auto it = _storage.find(key);
if (it != _storage.end()) {
return reinterpret_cast<T*>(it->second);
} else {
- T* rawPtr = createIfNotExists();
+ T* rawPtr = create_func();
if (rawPtr != nullptr) {
_delete_fn[key] = [](void* obj) { delete
reinterpret_cast<T*>(obj); };
_storage[key] = rawPtr;
@@ -107,4 +107,34 @@ private:
std::unordered_map<KType, void*> _storage;
};
+class ShardedKVCache {
+public:
+ ShardedKVCache(uint32_t num_shards) : _num_shards(num_shards) {
+ _shards = new (std::nothrow) KVCache<std::string>*[_num_shards];
+ for (uint32_t i = 0; i < _num_shards; i++) {
+ _shards[i] = new KVCache<std::string>();
+ }
+ }
+
+ ~ShardedKVCache() {
+ for (uint32_t i = 0; i < _num_shards; i++) {
+ delete _shards[i];
+ }
+ delete[] _shards;
+ }
+
+ template <class T>
+ T* get(const std::string& key, const std::function<T*()> create_func) {
+ return _shards[_get_idx(key)]->get(key, create_func);
+ }
+
+private:
+ uint32_t _get_idx(const std::string& key) {
+ return (uint32_t)std::hash<std::string>()(key) % _num_shards;
+ }
+
+ uint32_t _num_shards;
+ KVCache<std::string>** _shards;
+};
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index cbab6ac75d..cccbe0f9c2 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -35,8 +35,7 @@ namespace doris::vectorized {
constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
-static Status parse_thrift_footer(io::FileReaderSPtr file,
- std::shared_ptr<FileMetaData>&
file_metadata) {
+static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData**
file_metadata) {
uint8_t footer[PARQUET_FOOTER_SIZE];
int64_t file_size = file->size();
size_t bytes_read = 0;
@@ -65,8 +64,8 @@ static Status parse_thrift_footer(io::FileReaderSPtr file,
file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size,
res, &bytes_read));
DCHECK_EQ(bytes_read, metadata_size);
RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff.get(), &metadata_size,
true, &t_metadata));
- file_metadata.reset(new FileMetaData(t_metadata));
- RETURN_IF_ERROR(file_metadata->init_schema());
+ *file_metadata = new FileMetaData(t_metadata);
+ RETURN_IF_ERROR((*file_metadata)->init_schema());
return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 5a74fa0e03..50f01b6b86 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -34,7 +34,7 @@ namespace doris::vectorized {
ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size,
cctz::time_zone* ctz,
- io::IOContext* io_ctx, RuntimeState* state)
+ io::IOContext* io_ctx, RuntimeState* state,
ShardedKVCache* kv_cache)
: _profile(profile),
_scan_params(params),
_scan_range(range),
@@ -43,7 +43,8 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams
_range_size(range.size),
_ctz(ctz),
_io_ctx(io_ctx),
- _state(state) {
+ _state(state),
+ _kv_cache(kv_cache) {
_init_profile();
_init_system_properties();
_init_file_description();
@@ -89,6 +90,12 @@ void ParquetReader::_init_profile() {
ADD_CHILD_TIMER(_profile, "ColumnReadTime", parquet_profile);
_parquet_profile.parse_meta_time =
ADD_CHILD_TIMER(_profile, "ParseMetaTime", parquet_profile);
+ _parquet_profile.parse_footer_time =
+ ADD_CHILD_TIMER(_profile, "ParseFooterTime", parquet_profile);
+ _parquet_profile.open_file_time =
+ ADD_CHILD_TIMER(_profile, "FileOpenTime", parquet_profile);
+ _parquet_profile.open_file_num =
+ ADD_CHILD_COUNTER(_profile, "FileNum", TUnit::UNIT,
parquet_profile);
_parquet_profile.page_index_filter_time =
ADD_CHILD_TIMER(_profile, "PageIndexFilterTime",
parquet_profile);
_parquet_profile.row_group_filter_time =
@@ -128,6 +135,9 @@ void ParquetReader::close() {
COUNTER_UPDATE(_parquet_profile.to_read_bytes,
_statistics.read_bytes);
COUNTER_UPDATE(_parquet_profile.column_read_time,
_statistics.column_read_time);
COUNTER_UPDATE(_parquet_profile.parse_meta_time,
_statistics.parse_meta_time);
+ COUNTER_UPDATE(_parquet_profile.parse_footer_time,
_statistics.parse_footer_time);
+ COUNTER_UPDATE(_parquet_profile.open_file_time,
_statistics.open_file_time);
+ COUNTER_UPDATE(_parquet_profile.open_file_num,
_statistics.open_file_num);
COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
_statistics.page_index_filter_time);
COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
@@ -150,18 +160,46 @@ void ParquetReader::close() {
}
_closed = true;
}
+
+ if (_is_file_metadata_owned && _file_metadata != nullptr) {
+ delete _file_metadata;
+ }
}
Status ParquetReader::_open_file() {
if (_file_reader == nullptr) {
+ SCOPED_RAW_TIMER(&_statistics.open_file_time);
+ ++_statistics.open_file_num;
RETURN_IF_ERROR(FileFactory::create_file_reader(
_profile, _system_properties, _file_description,
&_file_system, &_file_reader));
}
if (_file_metadata == nullptr) {
+ SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
if (_file_reader->size() == 0) {
return Status::EndOfFile("open file failed, empty parquet file: "
+ _scan_range.path);
}
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
+ if (_kv_cache == nullptr) {
+ _is_file_metadata_owned = true;
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader,
&_file_metadata));
+ } else {
+ _is_file_metadata_owned = false;
+ _file_metadata = _kv_cache->get<FileMetaData>(
+ _meta_cache_key(_file_reader->path()), [&]() ->
FileMetaData* {
+ FileMetaData* meta;
+ Status st = parse_thrift_footer(_file_reader, &meta);
+ if (!st) {
+ LOG(INFO) << "failed to parse parquet footer for "
+ << _file_description.path << ", err: "
<< st;
+ return nullptr;
+ }
+ return meta;
+ });
+ }
+
+ if (_file_metadata == nullptr) {
+ return Status::InternalError("failed to get file meta data: {}",
+ _file_description.path);
+ }
}
return Status::OK();
}
@@ -173,9 +211,8 @@ std::vector<tparquet::KeyValue>
ParquetReader::get_metadata_key_values() {
}
Status ParquetReader::open() {
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_open_file());
- _t_metadata = &_file_metadata->to_thrift();
+ _t_metadata = &(_file_metadata->to_thrift());
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 26e8ccda1f..85f9b5c138 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -52,13 +52,16 @@ public:
int64_t read_bytes = 0;
int64_t column_read_time = 0;
int64_t parse_meta_time = 0;
+ int64_t parse_footer_time = 0;
+ int64_t open_file_time = 0;
+ int64_t open_file_num = 0;
int64_t row_group_filter_time = 0;
int64_t page_index_filter_time = 0;
};
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size,
cctz::time_zone* ctz,
- io::IOContext* io_ctx, RuntimeState* state);
+ io::IOContext* io_ctx, RuntimeState* state, ShardedKVCache*
kv_cache = nullptr);
ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
io::IOContext* io_ctx, RuntimeState* state);
@@ -124,6 +127,9 @@ private:
RuntimeProfile::Counter* to_read_bytes;
RuntimeProfile::Counter* column_read_time;
RuntimeProfile::Counter* parse_meta_time;
+ RuntimeProfile::Counter* parse_footer_time;
+ RuntimeProfile::Counter* open_file_time;
+ RuntimeProfile::Counter* open_file_num;
RuntimeProfile::Counter* row_group_filter_time;
RuntimeProfile::Counter* page_index_filter_time;
@@ -164,6 +170,7 @@ private:
void _init_bloom_filter();
Status _process_bloom_filter(bool* filter_group);
int64_t _get_column_start_offset(const tparquet::ColumnMetaData&
column_init_column_readers);
+ std::string _meta_cache_key(const std::string& path) { return "meta_" +
path; }
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;
@@ -172,7 +179,10 @@ private:
FileDescription _file_description;
std::shared_ptr<io::FileSystem> _file_system = nullptr;
io::FileReaderSPtr _file_reader = nullptr;
- std::shared_ptr<FileMetaData> _file_metadata;
+ FileMetaData* _file_metadata = nullptr;
+ // set to true if _file_metadata is owned by this reader.
+ // otherwise, it is owned by someone else, such as _kv_cache
+ bool _is_file_metadata_owned = false;
const tparquet::FileMetaData* _t_metadata;
std::unique_ptr<RowGroupReader> _current_group_reader = nullptr;
// read to the end of current reader
@@ -215,5 +225,9 @@ private:
const std::unordered_map<std::string, int>* _colname_to_slot_id;
const std::vector<VExprContext*>* _not_single_slot_filter_conjuncts;
const std::unordered_map<int, std::vector<VExprContext*>>*
_slot_id_to_filter_conjuncts;
+ // Cache to save some common part such as file footer.
+ // Owned by scan node and shared by all parquet readers of this scan node.
+ // Maybe null if not used
+ ShardedKVCache* _kv_cache = nullptr;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 9c03cb78d4..d925720931 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -38,7 +38,7 @@ const std::string ICEBERG_FILE_PATH = "file_path";
IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const
TFileScanRangeParams& params,
- const TFileRangeDesc& range,
KVCache<std::string>& kv_cache,
+ const TFileRangeDesc& range,
ShardedKVCache* kv_cache,
io::IOContext* io_ctx)
: TableFormatReader(file_format_reader),
_profile(profile),
@@ -178,8 +178,8 @@ Status IcebergTableReader::_position_delete(
SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
Status create_status = Status::OK();
- DeleteFile* delete_file_cache = _kv_cache.get<
- DeleteFile>(delete_file.path, [&]() -> DeleteFile* {
+ DeleteFile* delete_file_cache = _kv_cache->get<
+ DeleteFile>(_delet_file_cache_key(delete_file.path), [&]() ->
DeleteFile* {
TFileRangeDesc delete_range;
delete_range.path = delete_file.path;
delete_range.start_offset = 0;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 26dc630cac..3477ee0f83 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -40,7 +40,7 @@ public:
IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile*
profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, KVCache<std::string>&
kv_cache,
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx);
~IcebergTableReader() override = default;
@@ -96,12 +96,14 @@ private:
Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
void _gen_file_col_names();
void _gen_new_colname_to_value_range();
+ std::string _delet_file_cache_key(const std::string& path) { return
"delete_" + path; }
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
- KVCache<std::string>& _kv_cache;
+ // owned by scan node
+ ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
std::vector<int64_t> _delete_rows;
// col names from _file_slot_descs
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index f6b3c98573..4a762ac1a4 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -87,10 +87,14 @@ Status
NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
return Status::OK();
}
+ // TODO: determine kv cache shard num
+ size_t shard_num =
+ std::min<size_t>(config::doris_scanner_thread_pool_thread_num,
_scan_ranges.size());
+ _kv_cache.reset(new ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
scan_range.scan_range.ext_scan_range.file_scan_range,
- runtime_profile(), _kv_cache);
+ runtime_profile(),
_kv_cache.get());
_scanner_pool.add(scanner);
RETURN_IF_ERROR(((VFileScanner*)scanner)
->prepare(_vconjunct_ctx_ptr.get(),
&_colname_to_value_range,
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h
b/be/src/vec/exec/scan/new_file_scan_node.h
index f45ba9ddf9..2f67ca9713 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -39,6 +39,11 @@ protected:
private:
std::vector<TScanRangeParams> _scan_ranges;
- KVCache<std::string> _kv_cache;
+ // A in memory cache to save some common components
+ // of the this scan node. eg:
+ // 1. iceberg delete file
+ // 2. parquet file meta
+ // KVCache<std::string> _kv_cache;
+ std::unique_ptr<ShardedKVCache> _kv_cache;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 67b70465fb..298fad4d11 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -43,7 +43,7 @@ using namespace ErrorCode;
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile*
profile,
- KVCache<std::string>& kv_cache)
+ ShardedKVCache* kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_params(scan_range.params),
_ranges(scan_range.ranges),
@@ -75,6 +75,7 @@ Status VFileScanner::prepare(
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile,
"FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile,
"FileScannerConvertOuputBlockTime");
+ _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile,
"EmptyFileNum", TUnit::UNIT);
_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
@@ -534,9 +535,10 @@ Status VFileScanner::_get_next_reader() {
// TODO: use data lake type
switch (_params.format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- ParquetReader* parquet_reader = new ParquetReader(
- _profile, _params, range,
_state->query_options().batch_size,
- const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx.get(), _state);
+ ParquetReader* parquet_reader =
+ new ParquetReader(_profile, _params, range,
_state->query_options().batch_size,
+
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
+ _io_ctx.get(), _state, _kv_cache);
RETURN_IF_ERROR(parquet_reader->open());
if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx !=
nullptr) {
RETURN_IF_ERROR(_vconjunct_ctx->clone(_state,
&_push_down_expr));
@@ -594,9 +596,12 @@ Status VFileScanner::_get_next_reader() {
}
if (init_status.is<END_OF_FILE>()) {
+ COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
if (init_status.is<ErrorCode::NOT_FOUND>()) {
+ COUNTER_UPDATE(_empty_file_counter, 1);
+ LOG(INFO) << "failed to find file: " << range.path;
return init_status;
}
return Status::InternalError("failed to init reader for file {},
err: {}", range.path,
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index d4c6f0b33f..cd25367848 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -34,7 +34,7 @@ class VFileScanner : public VScanner {
public:
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile* profile,
- KVCache<string>& kv_cache);
+ ShardedKVCache* kv_cache);
Status open(RuntimeState* state) override;
@@ -100,8 +100,8 @@ protected:
std::unique_ptr<RowDescriptor> _src_row_desc;
// row desc for default exprs
std::unique_ptr<RowDescriptor> _default_val_row_desc;
-
- KVCache<std::string>& _kv_cache;
+ // owned by scan node
+ ShardedKVCache* _kv_cache;
bool _scanner_eof = false;
int _rows = 0;
@@ -129,6 +129,7 @@ private:
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
+ RuntimeProfile::Counter* _empty_file_counter = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index fd507b8e07..033606f310 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -54,8 +54,8 @@ TEST_F(ParquetThriftReaderTest, normal) {
&reader);
EXPECT_TRUE(st.ok());
- std::shared_ptr<FileMetaData> meta_data;
- parse_thrift_footer(reader, meta_data);
+ FileMetaData* meta_data;
+ parse_thrift_footer(reader, &meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
LOG(WARNING) << "=====================================";
@@ -69,6 +69,7 @@ TEST_F(ParquetThriftReaderTest, normal) {
LOG(WARNING) << "schema column repetition_type: " <<
value.repetition_type;
LOG(WARNING) << "schema column num children: " << value.num_children;
}
+ delete meta_data;
}
TEST_F(ParquetThriftReaderTest, complex_nested_file) {
@@ -86,8 +87,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
&reader);
EXPECT_TRUE(st.ok());
- std::shared_ptr<FileMetaData> metadata;
- parse_thrift_footer(reader, metadata);
+ FileMetaData* metadata;
+ parse_thrift_footer(reader, &metadata);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schemaDescriptor;
schemaDescriptor.parse_from_thrift(t_metadata.schema);
@@ -132,6 +133,7 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
ASSERT_EQ(schemaDescriptor.get_column_index("friend"), 3);
ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
+ delete metadata;
}
static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions,
size_t num_values) {
@@ -358,8 +360,8 @@ static void read_parquet_data_and_check(const std::string&
parquet_file,
std::unique_ptr<vectorized::Block> block;
create_block(block);
- std::shared_ptr<FileMetaData> metadata;
- parse_thrift_footer(reader, metadata);
+ FileMetaData* metadata;
+ parse_thrift_footer(reader, &metadata);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schema_descriptor;
schema_descriptor.parse_from_thrift(t_metadata.schema);
@@ -401,6 +403,7 @@ static void read_parquet_data_and_check(const std::string&
parquet_file,
Slice res(result_buf, result->size());
result->read_at(0, res, &bytes_read);
ASSERT_STREQ(block->dump_data(0, rows).c_str(),
reinterpret_cast<char*>(result_buf));
+ delete metadata;
}
TEST_F(ParquetThriftReaderTest, type_decoder) {
@@ -478,8 +481,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
EXPECT_TRUE(st.ok());
// prepare metadata
- std::shared_ptr<FileMetaData> meta_data;
- parse_thrift_footer(file_reader, meta_data);
+ FileMetaData* meta_data;
+ parse_thrift_footer(file_reader, &meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
cctz::time_zone ctz;
@@ -520,6 +523,7 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
Slice res(result_buf, result->size());
result->read_at(0, res, &bytes_read);
ASSERT_STREQ(block.dump_data(0, 10).c_str(),
reinterpret_cast<char*>(result_buf));
+ delete meta_data;
}
} // namespace vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index f61615a497..8ea14f2557 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -735,6 +735,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
.append(" length: ").append(file.getFileSize())
.append("\n");
}
+ if (files.size() > 3) {
+ output.append(prefix).append(" ...other
").append(files.size() - 3).append(" files\n");
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]