This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 4b445d9f22e  [enhance](S3) Add timeout config for s3 buffer allocation 
#26125 (#27987)
4b445d9f22e is described below

commit 4b445d9f22e40218a5f1795e823d2648c6193c45
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Fri Dec 22 14:17:18 2023 +0800

     [enhance](S3) Add timeout config for s3 buffer allocation #26125 (#27987)
---
 be/src/common/config.cpp                  |  2 ++
 be/src/common/config.h                    |  3 +++
 be/src/io/fs/s3_file_write_bufferpool.cpp | 18 ++++++++++++++----
 be/src/io/fs/s3_file_write_bufferpool.h   |  2 +-
 be/src/io/fs/s3_file_writer.cpp           |  3 ++-
 5 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 071422b57da..6acb373928b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1108,6 +1108,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
 
 DEFINE_Bool(enable_snapshot_action, "false");
 
+DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b962a79075e..24a7340063d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1165,6 +1165,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
 // whether to enable /api/snapshot api
 DECLARE_Bool(enable_snapshot_action);
 
+// The timeout config for S3 write buffer allocation
+DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp 
b/be/src/io/fs/s3_file_write_bufferpool.cpp
index 48887f9c6ea..5c5aa662316 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_write_bufferpool.cpp
@@ -17,6 +17,7 @@
 
 #include "s3_file_write_bufferpool.h"
 
+#include <chrono>
 #include <cstring>
 
 #include "common/config.h"
@@ -40,7 +41,7 @@ void S3FileBuffer::on_finished() {
 // when there is memory preserved, directly write data to buf
 // TODO:(AlexYue): write to file cache otherwise, then we'll wait for free 
buffer
 // and to rob it
-void S3FileBuffer::append_data(const Slice& data) {
+Status S3FileBuffer::append_data(const Slice& data) {
     Defer defer {[&] { _size += data.get_size(); }};
     while (true) {
         // if buf is not empty, it means there is memory preserved for this buf
@@ -50,9 +51,14 @@ void S3FileBuffer::append_data(const Slice& data) {
         } else {
             // wait allocate buffer pool
             auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            if (tmp->get_size() == 0) {
+                return Status::InternalError("Failed to allocate s3 writer 
buffer for {} seconds",
+                                             
config::s3_writer_buffer_allocation_timeout_second);
+            }
             rob_buffer(tmp);
         }
     }
+    return Status::OK();
 }
 
 void S3FileBuffer::submit() {
@@ -81,13 +87,17 @@ void S3FileBufferPool::init(int32_t 
s3_write_buffer_whole_size, int32_t s3_write
 
 std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
     std::shared_ptr<S3FileBuffer> buf = 
std::make_shared<S3FileBuffer>(_thread_pool);
+    int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
     // if need reserve then we must ensure return buf with memory preserved
     if (reserve) {
         {
             std::unique_lock<std::mutex> lck {_lock};
-            _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
-            buf->reserve_buffer(_free_raw_buffers.front());
-            _free_raw_buffers.pop_front();
+            _cv.wait_for(lck, std::chrono::seconds(timeout),
+                         [this]() { return !_free_raw_buffers.empty(); });
+            if (!_free_raw_buffers.empty()) {
+                buf->reserve_buffer(_free_raw_buffers.front());
+                _free_raw_buffers.pop_front();
+            }
         }
         return buf;
     }
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h 
b/be/src/io/fs/s3_file_write_bufferpool.h
index ad5f698f983..b4d3f322904 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -52,7 +52,7 @@ struct S3FileBuffer : public 
std::enable_shared_from_this<S3FileBuffer> {
 
     // append data into the memory buffer inside
     // or into the file cache if the buffer has no memory buffer
-    void append_data(const Slice& data);
+    Status append_data(const Slice& data);
     // upload to S3 and file cache in async threadpool
     void submit();
     // set the callback to upload to S3 file
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 4a937f52057..18de6ed0389 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -240,7 +240,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
 
             // if the buffer has memory buf inside, the data would be written 
into memory first then S3 then file cache
             // it would be written to cache then S3 if the buffer doesn't have 
memory preserved
-            _pending_buf->append_data(Slice {data[i].get_data() + pos, 
data_size_to_append});
+            RETURN_IF_ERROR(_pending_buf->append_data(
+                    Slice {data[i].get_data() + pos, data_size_to_append}));
 
             // if it's the last part, it could be less than 5MB, or it must
             // satisfy that the size is larger than or euqal to 5MB


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to