This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c27fef6ac08 [fix](file cache) guard null IOContext in cached remote
reader (#63842)
c27fef6ac08 is described below
commit c27fef6ac08ee7b79946066e0d2c8864689f6c2f
Author: lihangyu <[email protected]>
AuthorDate: Fri May 29 09:50:38 2026 +0800
[fix](file cache) guard null IOContext in cached remote reader (#63842)
- Guard `CachedRemoteFileReader::read_at_impl` against nullable
`IOContext`.
- Pass `NativeReader` `_io_ctx` through header and block reads.
- Add BE unit coverage for reading through `CachedRemoteFileReader`
without an explicit `IOContext`.
---
be/src/format/native/native_reader.cpp | 11 +++---
be/src/io/cache/cached_remote_file_reader.cpp | 6 ++-
be/test/io/cache/block_file_cache_test.cpp | 54 +++++++++++++++++++++++++++
3 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/be/src/format/native/native_reader.cpp
b/be/src/format/native/native_reader.cpp
index 565bab20231..cdf742c6925 100644
--- a/be/src/format/native/native_reader.cpp
+++ b/be/src/format/native/native_reader.cpp
@@ -45,7 +45,8 @@ NativeReader::~NativeReader() {
namespace {
Status validate_and_consume_header(io::FileReaderSPtr file_reader, const
TFileRangeDesc& range,
- int64_t* file_size, int64_t*
current_offset, bool* eof) {
+ int64_t* file_size, int64_t*
current_offset, bool* eof,
+ const io::IOContext* io_ctx) {
*file_size = file_reader->size();
*current_offset = 0;
*eof = (*file_size == 0);
@@ -63,7 +64,7 @@ Status validate_and_consume_header(io::FileReaderSPtr
file_reader, const TFileRa
char header[HEADER_SIZE];
Slice header_slice(header, sizeof(header));
size_t bytes_read = 0;
- RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
+ RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read,
io_ctx));
if (bytes_read != sizeof(header)) {
return Status::InternalError(
"failed to read Doris Native header from file {}, expect {}
bytes, got {} bytes",
@@ -140,7 +141,7 @@ Status NativeReader::init_reader() {
}
RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range,
&_file_size,
- &_current_offset, &_eof));
+ &_current_offset, &_eof,
_io_ctx));
return Status::OK();
}
@@ -310,7 +311,7 @@ Status NativeReader::_read_next_pblock(std::string* buff,
bool* eof) {
uint64_t len = 0;
Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
size_t bytes_read = 0;
- RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read));
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read, _io_ctx));
if (bytes_read == 0) {
*eof = true;
return Status::OK();
@@ -332,7 +333,7 @@ Status NativeReader::_read_next_pblock(std::string* buff,
bool* eof) {
buff->assign(len, '\0');
Slice data_slice(buff->data(), len);
bytes_read = 0;
- RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read));
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read, _io_ctx));
if (bytes_read != len) {
return Status::InternalError(
"Failed to read native block body from file {}, expect {}, "
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index 453b496929a..47b534d2dc9 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -276,9 +276,13 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t already_read = 0;
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+ const IOContext default_io_ctx;
+ if (io_ctx == nullptr) {
+ io_ctx = &default_io_ctx;
+ }
+ DCHECK(io_ctx);
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
- DCHECK(io_ctx);
if (offset > size()) {
return Status::InvalidArgument(
fmt::format("offset exceeds file size(offset: {}, file size:
{}, path: {})", offset,
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 544455937a1..9f5563efb3d 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3415,6 +3415,60 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
FileCacheFactory::instance()->_capacity = 0;
}
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_accepts_null_io_context) {
+ std::string cache_base_path =
+ caches_dir / "cached_remote_file_reader_accepts_null_io_context" /
"";
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ for (int i = 0; i < 100; ++i) {
+ if (cache->get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ opts.tablet_id = 10086;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read).ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_tail) {
std::string cache_base_path = caches_dir /
"cached_remote_file_reader_tail" / "";
if (fs::exists(cache_base_path)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]