This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 4b445d9f22e [enhance](S3) Add timeout config for s3 buffer allocation #26125 (#27987) 4b445d9f22e is described below commit 4b445d9f22e40218a5f1795e823d2648c6193c45 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Fri Dec 22 14:17:18 2023 +0800 [enhance](S3) Add timeout config for s3 buffer allocation #26125 (#27987) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 +++ be/src/io/fs/s3_file_write_bufferpool.cpp | 18 ++++++++++++++---- be/src/io/fs/s3_file_write_bufferpool.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 3 ++- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 071422b57da..6acb373928b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1108,6 +1108,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000"); DEFINE_Bool(enable_snapshot_action, "false"); +DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index b962a79075e..24a7340063d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1165,6 +1165,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms); // whether to enable /api/snapshot api DECLARE_Bool(enable_snapshot_action); +// The timeout config for S3 write buffer allocation +DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp index 48887f9c6ea..5c5aa662316 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.cpp +++ b/be/src/io/fs/s3_file_write_bufferpool.cpp @@ -17,6 +17,7 @@ #include "s3_file_write_bufferpool.h" +#include <chrono> #include <cstring> #include "common/config.h" @@ -40,7 +41,7 @@ void S3FileBuffer::on_finished() { // when there is memory preserved, directly write data to buf // TODO:(AlexYue): write to file cache otherwise, then we'll wait for free buffer // and to rob it -void S3FileBuffer::append_data(const Slice& data) { +Status S3FileBuffer::append_data(const Slice& data) { Defer defer {[&] { _size += data.get_size(); }}; while (true) { // if buf is not empty, it means there is memory preserved for this buf @@ -50,9 +51,14 @@ void S3FileBuffer::append_data(const Slice& data) { } else { // wait allocate buffer pool auto tmp = S3FileBufferPool::GetInstance()->allocate(true); + if (tmp->get_size() == 0) { + return Status::InternalError("Failed to allocate s3 writer buffer for {} seconds", + config::s3_writer_buffer_allocation_timeout_second); + } rob_buffer(tmp); } } + return Status::OK(); } void S3FileBuffer::submit() { @@ -81,13 +87,17 @@ void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) { std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool); + int64_t timeout = config::s3_writer_buffer_allocation_timeout_second; // if need reserve then we must ensure return buf with memory preserved if (reserve) { { std::unique_lock<std::mutex> lck {_lock}; - _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); }); - buf->reserve_buffer(_free_raw_buffers.front()); - _free_raw_buffers.pop_front(); + _cv.wait_for(lck, std::chrono::seconds(timeout), + [this]() { return !_free_raw_buffers.empty(); }); + if (!_free_raw_buffers.empty()) { + buf->reserve_buffer(_free_raw_buffers.front()); + _free_raw_buffers.pop_front(); + } } return buf; } diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index ad5f698f983..b4d3f322904 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -52,7 +52,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> { // append data into the memory buffer inside // or into the file cache if the buffer has no memory buffer - void append_data(const Slice& data); + Status append_data(const Slice& data); // upload to S3 and file cache in async threadpool void submit(); // set the callback to upload to S3 file diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 4a937f52057..18de6ed0389 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -240,7 +240,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { // if the buffer has memory buf inside, the data would be written into memory first then S3 then file cache // it would be written to cache then S3 if the buffer doesn't have memory preserved - _pending_buf->append_data(Slice {data[i].get_data() + pos, data_size_to_append}); + RETURN_IF_ERROR(_pending_buf->append_data( + Slice {data[i].get_data() + pos, data_size_to_append})); // if it's the last part, it could be less than 5MB, or it must // satisfy that the size is larger than or euqal to 5MB --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org