This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bd933ceecf [fix](be) Keep prefetch reader alive for async tasks 
(#63796)
0bd933ceecf is described below

commit 0bd933ceecf1762c948b5ed51291fa9791ec5c85
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 09:10:21 2026 +0800

    [fix](be) Keep prefetch reader alive for async tasks (#63796)
    
    Problem Summary: S3/OSS prefetch timeout can cancel and close
    PrefetchBufferedReader while an async PrefetchBuffer task is still
    running. The task kept PrefetchBuffer alive but only stored the
    underlying FileReader as a raw pointer, so the owner could destroy the
    reader before the async task resumed on the error path and logged reader
    metadata. Keep a shared FileReader reference in each PrefetchBuffer so
    the async prefetch task cannot outlive the reader it dereferences, and
    add a unit test that covers close timeout while the prefetch read is
    blocked.
---
 be/src/io/fs/buffered_reader.cpp       |  4 +-
 be/src/io/fs/buffered_reader.h         |  8 ++--
 be/test/io/fs/buffered_reader_test.cpp | 85 +++++++++++++++++++++++++++++++++-
 3 files changed, 90 insertions(+), 7 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 6cf794bb91b..246ee5e480d 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -679,8 +679,8 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
     // to make sure the buffer reader will start to read at right position.
     for (int i = 0; i < buffer_num; i++) {
         _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
-                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, 
_reader.get(),
-                _io_ctx_holder, sync_buffer));
+                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, 
_reader, _io_ctx_holder,
+                sync_buffer));
     }
 }
 
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 834a87522ca..beaf3e87a8d 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -427,12 +427,12 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
     enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
 
     PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t 
whole_buffer_size,
-                   io::FileReader* reader, std::shared_ptr<const IOContext> 
io_ctx,
+                   io::FileReaderSPtr reader, std::shared_ptr<const IOContext> 
io_ctx,
                    std::function<void(PrefetchBuffer&)> sync_profile)
             : _file_range(file_range),
               _size(buffer_size),
               _whole_buffer_size(whole_buffer_size),
-              _reader(reader),
+              _reader(std::move(reader)),
               _io_ctx_holder(std::move(io_ctx)),
               _io_ctx(_io_ctx_holder.get()),
               _sync_profile(std::move(sync_profile)) {}
@@ -443,7 +443,7 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
               _random_access_ranges(other._random_access_ranges),
               _size(other._size),
               _whole_buffer_size(other._whole_buffer_size),
-              _reader(other._reader),
+              _reader(std::move(other._reader)),
               _io_ctx_holder(std::move(other._io_ctx_holder)),
               _io_ctx(_io_ctx_holder.get()),
               _buf(std::move(other._buf)),
@@ -460,7 +460,7 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
     size_t _size {0};
     size_t _len {0};
     size_t _whole_buffer_size;
-    io::FileReader* _reader = nullptr;
+    io::FileReaderSPtr _reader;
     std::shared_ptr<const IOContext> _io_ctx_holder;
     const IOContext* _io_ctx = nullptr;
     PODArray<char> _buf;
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index bc92d22b178..75ed5e351dc 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -22,13 +22,17 @@
 #include <gtest/gtest-test-part.h>
 #include <limits.h>
 
+#include <atomic>
 #include <memory>
 #include <ostream>
+#include <thread>
 
+#include "common/config.h"
 #include "gtest/gtest_pred_impl.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/local_file_system.h"
 #include "runtime/exec_env.h"
+#include "util/countdown_latch.h"
 #include "util/stopwatch.hpp"
 #include "util/threadpool.h"
 
@@ -102,7 +106,7 @@ public:
 
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
-                        const io::IOContext* io_ctx) override {
+                        const io::IOContext* /*io_ctx*/) override {
         if (offset >= _size) {
             *bytes_read = 0;
             return Status::OK();
@@ -120,6 +124,52 @@ private:
     io::Path _path = "/tmp/mock";
 };
 
+class BlockingFileReader : public io::FileReader {
+public:
+    BlockingFileReader(size_t size, CountDownLatch* read_started, 
CountDownLatch* continue_read,
+                       std::atomic<bool>* destroyed)
+            : _size(size),
+              _read_started(read_started),
+              _continue_read(continue_read),
+              _destroyed(destroyed) {}
+
+    ~BlockingFileReader() override { _destroyed->store(true); }
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _path; }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    int64_t mtime() const override { return 0; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* /*io_ctx*/) override {
+        _read_started->count_down();
+        _continue_read->wait();
+        if (offset >= _size) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+        *bytes_read = std::min(_size - offset, result.size);
+        return Status::TimedOut("injected prefetch timeout");
+    }
+
+private:
+    size_t _size;
+    CountDownLatch* _read_started;
+    CountDownLatch* _continue_read;
+    std::atomic<bool>* _destroyed;
+    bool _closed = false;
+    io::Path _path = "/tmp/blocking";
+};
+
 class TestingRangeCacheFileReader : public io::FileReader {
 public:
     TestingRangeCacheFileReader(std::shared_ptr<io::FileReader> delegate) : 
_delegate(delegate) {};
@@ -302,6 +352,39 @@ TEST_F(BufferedReaderTest, test_miss) {
     EXPECT_EQ(45, bytes_read);
 }
 
+TEST_F(BufferedReaderTest, 
prefetch_task_keeps_reader_alive_after_close_timeout) {
+    const auto saved_timeout_ms = config::buffered_reader_read_timeout_ms;
+    config::buffered_reader_read_timeout_ms = 50;
+
+    CountDownLatch read_started(1);
+    CountDownLatch continue_read(1);
+    std::atomic<bool> reader_destroyed = false;
+    auto inner_reader = std::make_shared<BlockingFileReader>(1024, 
&read_started, &continue_read,
+                                                             
&reader_destroyed);
+    auto weak_reader = std::weak_ptr<io::FileReader>(inner_reader);
+    {
+        io::PrefetchBufferedReader reader(nullptr, std::move(inner_reader),
+                                          io::PrefetchRange(0, 1024));
+        uint8_t buf[1];
+        size_t bytes_read = 0;
+        Status st = reader.read_at(0, Slice {buf, 1}, &bytes_read);
+        EXPECT_TRUE(st.is<ErrorCode::TIMEOUT>());
+        EXPECT_TRUE(read_started.wait_for(std::chrono::seconds(1)));
+        EXPECT_FALSE(reader_destroyed.load());
+    }
+    EXPECT_FALSE(reader_destroyed.load());
+    EXPECT_FALSE(weak_reader.expired());
+
+    continue_read.count_down();
+    for (int i = 0; i < 100 && !reader_destroyed.load(); ++i) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    EXPECT_TRUE(reader_destroyed.load());
+    EXPECT_TRUE(weak_reader.expired());
+
+    config::buffered_reader_read_timeout_ms = saved_timeout_ms;
+}
+
 TEST_F(BufferedReaderTest, test_read_amplify) {
     size_t kb = 1024;
     io::FileReaderSPtr offset_reader = 
std::make_shared<MockOffsetFileReader>(2048 * kb); // 2MB


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to