This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0bd933ceecf [fix](be) Keep prefetch reader alive for async tasks
(#63796)
0bd933ceecf is described below
commit 0bd933ceecf1762c948b5ed51291fa9791ec5c85
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 09:10:21 2026 +0800
[fix](be) Keep prefetch reader alive for async tasks (#63796)
Problem Summary: S3/OSS prefetch timeout can cancel and close
PrefetchBufferedReader while an async PrefetchBuffer task is still
running. The task kept PrefetchBuffer alive but only stored the
underlying FileReader as a raw pointer, so the owner could destroy the
reader before the async task resumed on the error path and logged reader
metadata. Keep a shared FileReader reference in each PrefetchBuffer so
the async prefetch task cannot outlive the reader it dereferences, and
add a unit test that covers close timeout while the prefetch read is
blocked.
---
be/src/io/fs/buffered_reader.cpp | 4 +-
be/src/io/fs/buffered_reader.h | 8 ++--
be/test/io/fs/buffered_reader_test.cpp | 85 +++++++++++++++++++++++++++++++++-
3 files changed, 90 insertions(+), 7 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 6cf794bb91b..246ee5e480d 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -679,8 +679,8 @@
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
// to make sure the buffer reader will start to read at right position.
for (int i = 0; i < buffer_num; i++) {
_pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
- _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size,
_reader.get(),
- _io_ctx_holder, sync_buffer));
+ _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size,
_reader, _io_ctx_holder,
+ sync_buffer));
}
}
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 834a87522ca..beaf3e87a8d 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -427,12 +427,12 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t
whole_buffer_size,
- io::FileReader* reader, std::shared_ptr<const IOContext>
io_ctx,
+ io::FileReaderSPtr reader, std::shared_ptr<const IOContext>
io_ctx,
std::function<void(PrefetchBuffer&)> sync_profile)
: _file_range(file_range),
_size(buffer_size),
_whole_buffer_size(whole_buffer_size),
- _reader(reader),
+ _reader(std::move(reader)),
_io_ctx_holder(std::move(io_ctx)),
_io_ctx(_io_ctx_holder.get()),
_sync_profile(std::move(sync_profile)) {}
@@ -443,7 +443,7 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
_random_access_ranges(other._random_access_ranges),
_size(other._size),
_whole_buffer_size(other._whole_buffer_size),
- _reader(other._reader),
+ _reader(std::move(other._reader)),
_io_ctx_holder(std::move(other._io_ctx_holder)),
_io_ctx(_io_ctx_holder.get()),
_buf(std::move(other._buf)),
@@ -460,7 +460,7 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t _size {0};
size_t _len {0};
size_t _whole_buffer_size;
- io::FileReader* _reader = nullptr;
+ io::FileReaderSPtr _reader;
std::shared_ptr<const IOContext> _io_ctx_holder;
const IOContext* _io_ctx = nullptr;
PODArray<char> _buf;
diff --git a/be/test/io/fs/buffered_reader_test.cpp
b/be/test/io/fs/buffered_reader_test.cpp
index bc92d22b178..75ed5e351dc 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -22,13 +22,17 @@
#include <gtest/gtest-test-part.h>
#include <limits.h>
+#include <atomic>
#include <memory>
#include <ostream>
+#include <thread>
+#include "common/config.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
+#include "util/countdown_latch.h"
#include "util/stopwatch.hpp"
#include "util/threadpool.h"
@@ -102,7 +106,7 @@ public:
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
- const io::IOContext* io_ctx) override {
+ const io::IOContext* /*io_ctx*/) override {
if (offset >= _size) {
*bytes_read = 0;
return Status::OK();
@@ -120,6 +124,52 @@ private:
io::Path _path = "/tmp/mock";
};
+class BlockingFileReader : public io::FileReader {
+public:
+ BlockingFileReader(size_t size, CountDownLatch* read_started,
CountDownLatch* continue_read,
+ std::atomic<bool>* destroyed)
+ : _size(size),
+ _read_started(read_started),
+ _continue_read(continue_read),
+ _destroyed(destroyed) {}
+
+ ~BlockingFileReader() override { _destroyed->store(true); }
+
+ Status close() override {
+ _closed = true;
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _path; }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ int64_t mtime() const override { return 0; }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const io::IOContext* /*io_ctx*/) override {
+ _read_started->count_down();
+ _continue_read->wait();
+ if (offset >= _size) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+ *bytes_read = std::min(_size - offset, result.size);
+ return Status::TimedOut("injected prefetch timeout");
+ }
+
+private:
+ size_t _size;
+ CountDownLatch* _read_started;
+ CountDownLatch* _continue_read;
+ std::atomic<bool>* _destroyed;
+ bool _closed = false;
+ io::Path _path = "/tmp/blocking";
+};
+
class TestingRangeCacheFileReader : public io::FileReader {
public:
TestingRangeCacheFileReader(std::shared_ptr<io::FileReader> delegate) :
_delegate(delegate) {};
@@ -302,6 +352,39 @@ TEST_F(BufferedReaderTest, test_miss) {
EXPECT_EQ(45, bytes_read);
}
+TEST_F(BufferedReaderTest,
prefetch_task_keeps_reader_alive_after_close_timeout) {
+ const auto saved_timeout_ms = config::buffered_reader_read_timeout_ms;
+ config::buffered_reader_read_timeout_ms = 50;
+
+ CountDownLatch read_started(1);
+ CountDownLatch continue_read(1);
+ std::atomic<bool> reader_destroyed = false;
+ auto inner_reader = std::make_shared<BlockingFileReader>(1024,
&read_started, &continue_read,
+
&reader_destroyed);
+ auto weak_reader = std::weak_ptr<io::FileReader>(inner_reader);
+ {
+ io::PrefetchBufferedReader reader(nullptr, std::move(inner_reader),
+ io::PrefetchRange(0, 1024));
+ uint8_t buf[1];
+ size_t bytes_read = 0;
+ Status st = reader.read_at(0, Slice {buf, 1}, &bytes_read);
+ EXPECT_TRUE(st.is<ErrorCode::TIMEOUT>());
+ EXPECT_TRUE(read_started.wait_for(std::chrono::seconds(1)));
+ EXPECT_FALSE(reader_destroyed.load());
+ }
+ EXPECT_FALSE(reader_destroyed.load());
+ EXPECT_FALSE(weak_reader.expired());
+
+ continue_read.count_down();
+ for (int i = 0; i < 100 && !reader_destroyed.load(); ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ EXPECT_TRUE(reader_destroyed.load());
+ EXPECT_TRUE(weak_reader.expired());
+
+ config::buffered_reader_read_timeout_ms = saved_timeout_ms;
+}
+
TEST_F(BufferedReaderTest, test_read_amplify) {
size_t kb = 1024;
io::FileReaderSPtr offset_reader =
std::make_shared<MockOffsetFileReader>(2048 * kb); // 2MB
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]