github-actions[bot] commented on code in PR #18796:
URL: https://github.com/apache/doris/pull/18796#discussion_r1170022886
##########
be/src/io/fs/buffered_reader.cpp:
##########
@@ -86,11 +410,70 @@ void PrefetchBuffer::prefetch_buffer() {
// eof would come up with len == 0, it would be handled by read_buffer
}
+int PrefetchBuffer::search_read_range(size_t off) {
Review Comment:
warning: method 'search_read_range' can be made const
[readability-make-member-function-const]
```suggestion
int PrefetchBuffer::search_read_range(size_t off) const {
```
be/src/io/fs/buffered_reader.h:335:
```diff
- int search_read_range(size_t off);
+ int search_read_range(size_t off) const;
```
##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,293 @@
#include "common/config.h"
#include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
#include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
namespace doris {
namespace io {
+struct PrefetchRange {
+ size_t start_offset;
+ size_t end_offset;
+
+ PrefetchRange(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ PrefetchRange(const PrefetchRange& other) = default;
+
+ PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+class RandomAccessReader : public io::FileReader {
+public:
+ static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB
+ static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
+ static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
+ static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
+ static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+ RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+ const std::vector<PrefetchRange>& random_access_ranges)
+ : _profile(profile),
+ _reader(std::move(reader)),
+ _random_access_ranges(random_access_ranges) {
+ _range_cached_data.resize(random_access_ranges.size());
+ _size = _reader->size();
+ _remaining = TOTAL_BUFFER_SIZE;
+ if (_profile != nullptr) {
+ const char* random_profile = "MergedSmallIO";
+ ADD_TIMER(_profile, random_profile);
+ _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+ _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+ _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO",
TUnit::UNIT, random_profile);
+ _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT,
random_profile);
+ _request_bytes =
+ ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES,
random_profile);
+ _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes",
TUnit::BYTES, random_profile);
+ }
+ }
+
+ ~RandomAccessReader() override {
+ if (_read_slice != nullptr) {
+ delete[] _read_slice;
+ }
+ for (char* box : _boxes) {
+ delete[] box;
+ }
+ close();
+ }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ // the underlying buffer is closed in its own destructor
+ // return _reader->close();
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+ }
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override;
+
+private:
+ struct RangeCachedData {
+ size_t start_offset;
+ size_t end_offset;
+ std::vector<int16> ref_box;
+ std::vector<uint32> box_start_offset;
+ std::vector<uint32> box_end_offset;
+ bool has_read = false;
+
+ RangeCachedData(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ RangeCachedData() : start_offset(0), end_offset(0) {}
+
+ bool empty() const { return start_offset == end_offset; }
+
+ bool contains(size_t offset) const { return start_offset <= offset &&
offset < end_offset; }
+
+ void reset() {
+ start_offset = 0;
+ end_offset = 0;
+ ref_box.clear();
+ box_start_offset.clear();
+ box_end_offset.clear();
+ }
+
+ int16 release_last_box() {
+ // we can only release the last referenced box to ensure
sequential read in range
+ if (!empty()) {
+ int16 last_box_ref = ref_box.back();
+ uint32 released_size = box_end_offset.back() -
box_start_offset.back();
+ ref_box.pop_back();
+ box_start_offset.pop_back();
+ box_end_offset.pop_back();
+ end_offset -= released_size;
+ if (empty()) {
+ reset();
+ }
+ return last_box_ref;
+ }
+ return -1;
+ }
+ };
+
+ struct Statistics {
+ int64_t copy_time = 0;
+ int64_t read_time = 0;
+ int64_t request_io = 0;
+ int64_t merged_io = 0;
+ int64_t request_bytes = 0;
+ int64_t read_bytes = 0;
+ };
+
+ RuntimeProfile::Counter* _copy_time;
+ RuntimeProfile::Counter* _read_time;
+ RuntimeProfile::Counter* _request_io;
+ RuntimeProfile::Counter* _merged_io;
+ RuntimeProfile::Counter* _request_bytes;
+ RuntimeProfile::Counter* _read_bytes;
+
+ int _search_read_range(size_t start_offset, size_t end_offset);
+ void _clean_cached_data(RangeCachedData& cached_data);
+ void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice
result,
+ size_t* bytes_read);
+ Status _fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read,
+ const IOContext* io_ctx);
+ void _dec_box_ref(int16 box_index);
+
+ RuntimeProfile* _profile = nullptr;
+ io::FileReaderSPtr _reader;
+ const std::vector<PrefetchRange> _random_access_ranges;
+ std::vector<RangeCachedData> _range_cached_data;
+ size_t _size;
+ bool _closed = false;
+ size_t _remaining;
+
+ char* _read_slice = nullptr;
+ std::vector<char*> _boxes;
+ int16 _last_box_ref = -1;
+ uint32 _last_box_usage = 0;
+ std::vector<int16> _box_ref;
+
+ Statistics _statistics;
+};
+
+class DelegateReader {
+public:
+ class ThreadSafeReader : public io::FileReader {
+ public:
+ ThreadSafeReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {
+ _size = _reader->size();
+ }
+
+ ~ThreadSafeReader() override { close(); }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ return _reader->close();
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+ protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||
Review Comment:
warning: expression with side effects will be evaluated despite being used
as an operand to 'typeid' [clang-diagnostic-potentially-evaluated-expression]
```cpp
if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||
^
```
##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,293 @@
#include "common/config.h"
#include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
#include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
namespace doris {
namespace io {
+struct PrefetchRange {
+ size_t start_offset;
+ size_t end_offset;
+
+ PrefetchRange(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ PrefetchRange(const PrefetchRange& other) = default;
+
+ PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+class RandomAccessReader : public io::FileReader {
+public:
+ static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB
+ static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
+ static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
+ static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
+ static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+ RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+ const std::vector<PrefetchRange>& random_access_ranges)
+ : _profile(profile),
+ _reader(std::move(reader)),
+ _random_access_ranges(random_access_ranges) {
+ _range_cached_data.resize(random_access_ranges.size());
+ _size = _reader->size();
+ _remaining = TOTAL_BUFFER_SIZE;
+ if (_profile != nullptr) {
+ const char* random_profile = "MergedSmallIO";
+ ADD_TIMER(_profile, random_profile);
+ _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+ _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+ _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO",
TUnit::UNIT, random_profile);
+ _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT,
random_profile);
+ _request_bytes =
+ ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES,
random_profile);
+ _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes",
TUnit::BYTES, random_profile);
+ }
+ }
+
+ ~RandomAccessReader() override {
+ if (_read_slice != nullptr) {
+ delete[] _read_slice;
+ }
+ for (char* box : _boxes) {
+ delete[] box;
+ }
+ close();
+ }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ // the underlying buffer is closed in its own destructor
+ // return _reader->close();
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+ }
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override;
+
+private:
+ struct RangeCachedData {
+ size_t start_offset;
+ size_t end_offset;
+ std::vector<int16> ref_box;
+ std::vector<uint32> box_start_offset;
+ std::vector<uint32> box_end_offset;
+ bool has_read = false;
+
+ RangeCachedData(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ RangeCachedData() : start_offset(0), end_offset(0) {}
+
+ bool empty() const { return start_offset == end_offset; }
+
+ bool contains(size_t offset) const { return start_offset <= offset &&
offset < end_offset; }
+
+ void reset() {
+ start_offset = 0;
+ end_offset = 0;
+ ref_box.clear();
+ box_start_offset.clear();
+ box_end_offset.clear();
+ }
+
+ int16 release_last_box() {
+ // we can only release the last referenced box to ensure
sequential read in range
+ if (!empty()) {
+ int16 last_box_ref = ref_box.back();
+ uint32 released_size = box_end_offset.back() -
box_start_offset.back();
+ ref_box.pop_back();
+ box_start_offset.pop_back();
+ box_end_offset.pop_back();
+ end_offset -= released_size;
+ if (empty()) {
+ reset();
+ }
+ return last_box_ref;
+ }
+ return -1;
+ }
+ };
+
+ struct Statistics {
+ int64_t copy_time = 0;
+ int64_t read_time = 0;
+ int64_t request_io = 0;
+ int64_t merged_io = 0;
+ int64_t request_bytes = 0;
+ int64_t read_bytes = 0;
+ };
+
+ RuntimeProfile::Counter* _copy_time;
+ RuntimeProfile::Counter* _read_time;
+ RuntimeProfile::Counter* _request_io;
+ RuntimeProfile::Counter* _merged_io;
+ RuntimeProfile::Counter* _request_bytes;
+ RuntimeProfile::Counter* _read_bytes;
+
+ int _search_read_range(size_t start_offset, size_t end_offset);
+ void _clean_cached_data(RangeCachedData& cached_data);
+ void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice
result,
+ size_t* bytes_read);
+ Status _fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read,
+ const IOContext* io_ctx);
+ void _dec_box_ref(int16 box_index);
+
+ RuntimeProfile* _profile = nullptr;
+ io::FileReaderSPtr _reader;
+ const std::vector<PrefetchRange> _random_access_ranges;
+ std::vector<RangeCachedData> _range_cached_data;
+ size_t _size;
+ bool _closed = false;
+ size_t _remaining;
+
+ char* _read_slice = nullptr;
+ std::vector<char*> _boxes;
+ int16 _last_box_ref = -1;
+ uint32 _last_box_usage = 0;
+ std::vector<int16> _box_ref;
+
+ Statistics _statistics;
+};
+
+class DelegateReader {
+public:
+ class ThreadSafeReader : public io::FileReader {
+ public:
+ ThreadSafeReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {
+ _size = _reader->size();
+ }
+
+ ~ThreadSafeReader() override { close(); }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ return _reader->close();
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+ protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||
+ typeid(*_reader.get()) == typeid(io::BrokerFileReader)) {
+ return _reader->read_at(offset, result, bytes_read, io_ctx);
+ } else if (io::CachedRemoteFileReader* reader =
+
typeid_cast<io::CachedRemoteFileReader*>(_reader.get())) {
+ if (typeid(*reader->get_remote_reader()) ==
typeid(io::S3FileReader) ||
Review Comment:
warning: expression with side effects will be evaluated despite being used
as an operand to 'typeid' [clang-diagnostic-potentially-evaluated-expression]
```cpp
if (typeid(*reader->get_remote_reader()) ==
typeid(io::S3FileReader) ||
^
```
##########
be/src/vec/exec/format/parquet/parquet_thrift_util.h:
##########
@@ -34,37 +34,43 @@ namespace doris::vectorized {
constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
+constexpr size_t INIT_META_SIZE = 128 * 1024; // 128k
static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData**
file_metadata,
Review Comment:
warning: unused function 'parse_thrift_footer'
[clang-diagnostic-unused-function]
```cpp
static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData**
file_metadata,
^
```
##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,293 @@
#include "common/config.h"
#include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
#include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
namespace doris {
namespace io {
+struct PrefetchRange {
+ size_t start_offset;
+ size_t end_offset;
+
+ PrefetchRange(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ PrefetchRange(const PrefetchRange& other) = default;
+
+ PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+class RandomAccessReader : public io::FileReader {
+public:
+ static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB
+ static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
+ static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
+ static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
+ static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+ RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+ const std::vector<PrefetchRange>& random_access_ranges)
+ : _profile(profile),
+ _reader(std::move(reader)),
+ _random_access_ranges(random_access_ranges) {
+ _range_cached_data.resize(random_access_ranges.size());
+ _size = _reader->size();
+ _remaining = TOTAL_BUFFER_SIZE;
+ if (_profile != nullptr) {
+ const char* random_profile = "MergedSmallIO";
+ ADD_TIMER(_profile, random_profile);
+ _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+ _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+ _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO",
TUnit::UNIT, random_profile);
+ _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT,
random_profile);
+ _request_bytes =
+ ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES,
random_profile);
+ _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes",
TUnit::BYTES, random_profile);
+ }
+ }
+
+ ~RandomAccessReader() override {
+ if (_read_slice != nullptr) {
+ delete[] _read_slice;
+ }
+ for (char* box : _boxes) {
+ delete[] box;
+ }
+ close();
+ }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ // the underlying buffer is closed in its own destructor
+ // return _reader->close();
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+ }
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override;
+
+private:
+ struct RangeCachedData {
+ size_t start_offset;
+ size_t end_offset;
+ std::vector<int16> ref_box;
+ std::vector<uint32> box_start_offset;
+ std::vector<uint32> box_end_offset;
+ bool has_read = false;
+
+ RangeCachedData(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ RangeCachedData() : start_offset(0), end_offset(0) {}
+
+ bool empty() const { return start_offset == end_offset; }
+
+ bool contains(size_t offset) const { return start_offset <= offset &&
offset < end_offset; }
+
+ void reset() {
+ start_offset = 0;
+ end_offset = 0;
+ ref_box.clear();
+ box_start_offset.clear();
+ box_end_offset.clear();
+ }
+
+ int16 release_last_box() {
+ // we can only release the last referenced box to ensure
sequential read in range
+ if (!empty()) {
+ int16 last_box_ref = ref_box.back();
+ uint32 released_size = box_end_offset.back() -
box_start_offset.back();
+ ref_box.pop_back();
+ box_start_offset.pop_back();
+ box_end_offset.pop_back();
+ end_offset -= released_size;
+ if (empty()) {
+ reset();
+ }
+ return last_box_ref;
+ }
+ return -1;
+ }
+ };
+
+ struct Statistics {
+ int64_t copy_time = 0;
+ int64_t read_time = 0;
+ int64_t request_io = 0;
+ int64_t merged_io = 0;
+ int64_t request_bytes = 0;
+ int64_t read_bytes = 0;
+ };
+
+ RuntimeProfile::Counter* _copy_time;
+ RuntimeProfile::Counter* _read_time;
+ RuntimeProfile::Counter* _request_io;
+ RuntimeProfile::Counter* _merged_io;
+ RuntimeProfile::Counter* _request_bytes;
+ RuntimeProfile::Counter* _read_bytes;
+
+ int _search_read_range(size_t start_offset, size_t end_offset);
+ void _clean_cached_data(RangeCachedData& cached_data);
+ void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice
result,
+ size_t* bytes_read);
+ Status _fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read,
+ const IOContext* io_ctx);
+ void _dec_box_ref(int16 box_index);
+
+ RuntimeProfile* _profile = nullptr;
+ io::FileReaderSPtr _reader;
+ const std::vector<PrefetchRange> _random_access_ranges;
+ std::vector<RangeCachedData> _range_cached_data;
+ size_t _size;
+ bool _closed = false;
+ size_t _remaining;
+
+ char* _read_slice = nullptr;
+ std::vector<char*> _boxes;
+ int16 _last_box_ref = -1;
+ uint32 _last_box_usage = 0;
+ std::vector<int16> _box_ref;
+
+ Statistics _statistics;
+};
+
+class DelegateReader {
+public:
+ class ThreadSafeReader : public io::FileReader {
+ public:
+ ThreadSafeReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {
+ _size = _reader->size();
+ }
+
+ ~ThreadSafeReader() override { close(); }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ return _reader->close();
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+ protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||
+ typeid(*_reader.get()) == typeid(io::BrokerFileReader)) {
Review Comment:
warning: expression with side effects will be evaluated despite being used
as an operand to 'typeid' [clang-diagnostic-potentially-evaluated-expression]
```cpp
typeid(*_reader.get()) == typeid(io::BrokerFileReader)) {
^
```
##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,293 @@
#include "common/config.h"
#include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
#include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
namespace doris {
namespace io {
+struct PrefetchRange {
+ size_t start_offset;
+ size_t end_offset;
+
+ PrefetchRange(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ PrefetchRange(const PrefetchRange& other) = default;
+
+ PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+class RandomAccessReader : public io::FileReader {
+public:
+ static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB
+ static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
+ static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
+ static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
+ static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+ RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+ const std::vector<PrefetchRange>& random_access_ranges)
+ : _profile(profile),
+ _reader(std::move(reader)),
+ _random_access_ranges(random_access_ranges) {
+ _range_cached_data.resize(random_access_ranges.size());
+ _size = _reader->size();
+ _remaining = TOTAL_BUFFER_SIZE;
+ if (_profile != nullptr) {
+ const char* random_profile = "MergedSmallIO";
+ ADD_TIMER(_profile, random_profile);
+ _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+ _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+ _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO",
TUnit::UNIT, random_profile);
+ _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT,
random_profile);
+ _request_bytes =
+ ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES,
random_profile);
+ _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes",
TUnit::BYTES, random_profile);
+ }
+ }
+
+ ~RandomAccessReader() override {
+ if (_read_slice != nullptr) {
+ delete[] _read_slice;
+ }
+ for (char* box : _boxes) {
+ delete[] box;
+ }
+ close();
+ }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ // the underlying buffer is closed in its own destructor
+ // return _reader->close();
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+ }
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override;
+
+private:
+ struct RangeCachedData {
+ size_t start_offset;
+ size_t end_offset;
+ std::vector<int16> ref_box;
+ std::vector<uint32> box_start_offset;
+ std::vector<uint32> box_end_offset;
+ bool has_read = false;
+
+ RangeCachedData(size_t start_offset, size_t end_offset)
+ : start_offset(start_offset), end_offset(end_offset) {}
+
+ RangeCachedData() : start_offset(0), end_offset(0) {}
+
+ bool empty() const { return start_offset == end_offset; }
+
+ bool contains(size_t offset) const { return start_offset <= offset &&
offset < end_offset; }
+
+ void reset() {
+ start_offset = 0;
+ end_offset = 0;
+ ref_box.clear();
+ box_start_offset.clear();
+ box_end_offset.clear();
+ }
+
+ int16 release_last_box() {
+ // we can only release the last referenced box to ensure
sequential read in range
+ if (!empty()) {
+ int16 last_box_ref = ref_box.back();
+ uint32 released_size = box_end_offset.back() -
box_start_offset.back();
+ ref_box.pop_back();
+ box_start_offset.pop_back();
+ box_end_offset.pop_back();
+ end_offset -= released_size;
+ if (empty()) {
+ reset();
+ }
+ return last_box_ref;
+ }
+ return -1;
+ }
+ };
+
+ struct Statistics {
+ int64_t copy_time = 0;
+ int64_t read_time = 0;
+ int64_t request_io = 0;
+ int64_t merged_io = 0;
+ int64_t request_bytes = 0;
+ int64_t read_bytes = 0;
+ };
+
+ RuntimeProfile::Counter* _copy_time;
+ RuntimeProfile::Counter* _read_time;
+ RuntimeProfile::Counter* _request_io;
+ RuntimeProfile::Counter* _merged_io;
+ RuntimeProfile::Counter* _request_bytes;
+ RuntimeProfile::Counter* _read_bytes;
+
+ int _search_read_range(size_t start_offset, size_t end_offset);
+ void _clean_cached_data(RangeCachedData& cached_data);
+ void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice
result,
+ size_t* bytes_read);
+ Status _fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read,
+ const IOContext* io_ctx);
+ void _dec_box_ref(int16 box_index);
+
+ RuntimeProfile* _profile = nullptr;
+ io::FileReaderSPtr _reader;
+ const std::vector<PrefetchRange> _random_access_ranges;
+ std::vector<RangeCachedData> _range_cached_data;
+ size_t _size;
+ bool _closed = false;
+ size_t _remaining;
+
+ char* _read_slice = nullptr;
+ std::vector<char*> _boxes;
+ int16 _last_box_ref = -1;
+ uint32 _last_box_usage = 0;
+ std::vector<int16> _box_ref;
+
+ Statistics _statistics;
+};
+
+class DelegateReader {
+public:
+ class ThreadSafeReader : public io::FileReader {
+ public:
+ ThreadSafeReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {
+ _size = _reader->size();
+ }
+
+ ~ThreadSafeReader() override { close(); }
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ return _reader->close();
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+ protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||
+ typeid(*_reader.get()) == typeid(io::BrokerFileReader)) {
+ return _reader->read_at(offset, result, bytes_read, io_ctx);
+ } else if (io::CachedRemoteFileReader* reader =
+
typeid_cast<io::CachedRemoteFileReader*>(_reader.get())) {
+ if (typeid(*reader->get_remote_reader()) ==
typeid(io::S3FileReader) ||
+ typeid(*reader->get_remote_reader()) ==
typeid(io::BrokerFileReader)) {
Review Comment:
warning: expression with side effects will be evaluated despite being used
as an operand to 'typeid' [clang-diagnostic-potentially-evaluated-expression]
```cpp
typeid(*reader->get_remote_reader()) ==
typeid(io::BrokerFileReader)) {
^
```
##########
be/src/io/fs/buffered_reader.cpp:
##########
@@ -86,11 +410,70 @@
// eof would come up with len == 0, it would be handled by read_buffer
}
+int PrefetchBuffer::search_read_range(size_t off) {
+ if (_random_access_ranges == nullptr || _random_access_ranges->empty()) {
+ return -1;
+ }
+ const std::vector<PrefetchRange>& random_access_ranges =
*_random_access_ranges;
+ int left = 0, right = random_access_ranges.size() - 1;
+ do {
+ int mid = left + (right - left) / 2;
+ const PrefetchRange& range = random_access_ranges[mid];
+ if (range.start_offset <= off && range.end_offset > off) {
+ return mid;
+ } else if (range.start_offset > off) {
+ right = mid;
+ } else {
+ left = mid + 1;
+ }
+ } while (left < right);
+ if (random_access_ranges[right].start_offset > off) {
+ return right;
+ } else {
+ return -1;
+ }
+}
+
+size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) {
Review Comment:
warning: method 'merge_small_ranges' can be made const
[readability-make-member-function-const]
```suggestion
size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const
{
```
be/src/io/fs/buffered_reader.h:337:
```diff
- size_t merge_small_ranges(size_t off, int range_index);
+ size_t merge_small_ranges(size_t off, int range_index) const;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]