This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3cd0bdf3ba8 [feature](IO) Replace file writer's finalize function with async close (#34679) 3cd0bdf3ba8 is described below commit 3cd0bdf3ba8c002ae092b3d87bd766cb18e85d21 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Mon May 13 16:44:58 2024 +0800 [feature](IO) Replace file writer's finalize function with async close (#34679) --- be/src/common/config.cpp | 5 + be/src/common/config.h | 5 + be/src/io/fs/broker_file_writer.cpp | 30 ++++-- be/src/io/fs/broker_file_writer.h | 8 +- be/src/io/fs/file_writer.h | 20 ++-- be/src/io/fs/hdfs_file_writer.cpp | 81 +++++++++-------- be/src/io/fs/hdfs_file_writer.h | 14 ++- be/src/io/fs/local_file_writer.cpp | 59 ++++++++---- be/src/io/fs/local_file_writer.h | 10 +- be/src/io/fs/s3_file_writer.cpp | 101 ++++++++------------- be/src/io/fs/s3_file_writer.h | 12 ++- be/src/io/fs/stream_sink_file_writer.cpp | 29 ++++-- be/src/io/fs/stream_sink_file_writer.h | 11 +-- be/src/olap/rowset/beta_rowset_writer.cpp | 4 +- .../segment_v2/inverted_index_fs_directory.cpp | 10 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 2 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 2 +- be/src/olap/storage_engine.h | 1 + be/src/pipeline/pipeline_tracing.cpp | 2 - be/src/runtime/exec_env.h | 2 + be/src/runtime/exec_env_init.cpp | 31 +++++++ be/src/runtime/load_stream_writer.cpp | 4 +- be/test/io/cache/block_file_cache_test.cpp | 1 - be/test/io/fs/hdfs_file_system_test.cpp | 3 - be/test/io/fs/local_file_system_test.cpp | 6 -- be/test/io/fs/s3_file_writer_test.cpp | 8 -- be/test/io/fs/stream_sink_file_writer_test.cpp | 2 +- be/test/olap/tablet_cooldown_test.cpp | 6 +- ...index_compound_directory_fault_injection.groovy | 7 -- 29 files changed, 268 insertions(+), 208 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8055e3fe456..4ec90cfa886 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1263,6 +1263,11 @@ DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300"); // The max retry times when hdfs write failed DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3"); +// The min thread num for NonBlockCloseThreadPool +DEFINE_Int64(min_nonblock_close_thread_num, "12"); +// The max thread num for NonBlockCloseThreadPool +DEFINE_Int64(max_nonblock_close_thread_num, "64"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 46dcdb8658b..7ff833513eb 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1337,6 +1337,11 @@ DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds); // The max retry times when hdfs write failed DECLARE_mInt64(hdfs_jni_write_max_retry_time); +// The min thread num for NonBlockCloseThreadPool +DECLARE_Int64(min_nonblock_close_thread_num); +// The max thread num for NonBlockCloseThreadPool +DECLARE_Int64(max_nonblock_close_thread_num); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index f162b02b19e..2e5af74090a 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -28,6 +28,7 @@ #include "common/config.h" #include "common/logging.h" +#include "io/fs/file_writer.h" #include "runtime/broker_mgr.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -60,12 +61,29 @@ inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) { } #endif -Status BrokerFileWriter::close() { - if (_closed) { +Status BrokerFileWriter::close(bool non_block) { + if (_close_state == FileWriterState::CLOSED) { + return Status::InternalError("BrokerFileWriter already closed, file path {}", + _path.native()); + } + if (_close_state == FileWriterState::ASYNC_CLOSING) { + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + // Actucally the first time call to close(true) would return the value of _finalize, if it returned one + // error status then the code would never call the second close(true) + _close_state = FileWriterState::CLOSED; return Status::OK(); } - _closed = true; + if (non_block) { + _close_state = FileWriterState::ASYNC_CLOSING; + } else { + _close_state = FileWriterState::CLOSED; + } + return _close_impl(); +} +Status BrokerFileWriter::_close_impl() { TBrokerCloseWriterRequest request; request.__set_version(TBrokerVersion::VERSION_ONE); request.__set_fd(_fd); @@ -117,7 +135,7 @@ Status BrokerFileWriter::close() { } Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) { - if (_closed) [[unlikely]] { + if (_close_state != FileWriterState::OPEN) [[unlikely]] { return Status::InternalError("append to closed file: {}", _path.native()); } @@ -135,10 +153,6 @@ Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) { return Status::OK(); } -Status BrokerFileWriter::finalize() { - return Status::OK(); -} - Result<FileWriterPtr> BrokerFileWriter::create(ExecEnv* env, const TNetworkAddress& broker_address, const std::map<std::string, std::string>& properties, Path path) { diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index d6fce52a05c..ad065a47327 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -43,17 +43,17 @@ public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, Path path, TBrokerFD fd); ~BrokerFileWriter() override; + Status close(bool non_block = false) override; - Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; const Path& path() const override { return _path; } size_t bytes_appended() const override { return _cur_offset; } - bool closed() const override { return _closed; } + FileWriterState closed() const override { return _close_state; } FileCacheAllocatorBuilder* cache_builder() const override { return nullptr; } private: Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes); + Status _close_impl(); private: ExecEnv* _env = nullptr; @@ -61,7 +61,7 @@ private: Path _path; size_t _cur_offset = 0; TBrokerFD _fd; - bool _closed = false; + FileWriterState _close_state {FileWriterState::OPEN}; }; } // end namespace io diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index ad13f7f0095..56b7f731c51 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -17,6 +17,7 @@ #pragma once +#include <future> #include <memory> #include "common/status.h" @@ -46,6 +47,17 @@ struct FileWriterOptions { uint64_t file_cache_expiration = 0; // Absolute time }; +struct AsyncCloseStatusPack { + std::promise<Status> promise; + std::future<Status> future; +}; + +enum class FileWriterState : uint8_t { + OPEN = 0, + ASYNC_CLOSING, + CLOSED, +}; + class FileWriter { public: FileWriter() = default; @@ -56,21 +68,17 @@ public: // Normal close. Wait for all data to persist before returning. // If there is no data appended, an empty file will be persisted. - virtual Status close() = 0; + virtual Status close(bool non_block = false) = 0; Status append(const Slice& data) { return appendv(&data, 1); } virtual Status appendv(const Slice* data, size_t data_cnt) = 0; - // Call this method when there is no more data to write. - // FIXME(cyx): Does not seem to be an appropriate interface for file system? - virtual Status finalize() = 0; - virtual const Path& path() const = 0; virtual size_t bytes_appended() const = 0; - virtual bool closed() const = 0; + virtual FileWriterState closed() const = 0; virtual FileCacheAllocatorBuilder* cache_builder() const = 0; }; diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index a37bd323984..7a499239bd0 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -36,8 +36,10 @@ #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" #include "io/fs/err_utils.h" +#include "io/fs/file_writer.h" #include "io/fs/hdfs_file_system.h" #include "io/hdfs_util.h" +#include "runtime/exec_env.h" #include "service/backend_options.h" #include "util/bvar_helper.h" #include "util/jni-util.h" @@ -142,6 +144,11 @@ HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, } HdfsFileWriter::~HdfsFileWriter() { + if (_async_close_pack != nullptr) { + // For thread safety + std::ignore = _async_close_pack->promise.get_future(); + _async_close_pack = nullptr; + } if (_hdfs_file) { SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); @@ -195,13 +202,41 @@ Status HdfsFileWriter::_acquire_jni_memory(size_t size) { #endif } -Status HdfsFileWriter::close() { - if (_closed) { - return Status::OK(); +Status HdfsFileWriter::close(bool non_block) { + if (closed() == FileWriterState::CLOSED) { + return Status::InternalError("HdfsFileWriter already closed, file path {}, fs name {}", + _path.native(), _fs_name); + } + if (closed() == FileWriterState::ASYNC_CLOSING) { + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + CHECK(_async_close_pack != nullptr); + _st = _async_close_pack->future.get(); + _async_close_pack = nullptr; + // We should wait for all the pre async task to be finished + _close_state = FileWriterState::CLOSED; + // The next time we call close() with no matter non_block true or false, it would always return the + // '_st' value because this writer is already closed. + return _st; + } + if (non_block) { + _close_state = FileWriterState::ASYNC_CLOSING; + _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); + _async_close_pack->future = _async_close_pack->promise.get_future(); + return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func( + [&]() { _async_close_pack->promise.set_value(_close_impl()); }); } - _closed = true; + _st = _close_impl(); + _close_state = FileWriterState::CLOSED; + return _st; +} + +Status HdfsFileWriter::_close_impl() { if (_batch_buffer.size() != 0) { - RETURN_IF_ERROR(_flush_buffer()); + if (_st = _flush_buffer(); !_st.ok()) { + return _st; + } } int ret; if (_sync_file_data) { @@ -220,9 +255,10 @@ Status HdfsFileWriter::close() { Status::InternalError("failed to sync hdfs file")); if (ret != 0) { - return Status::InternalError( + _st = Status::InternalError( "failed to sync hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, _path.native(), hdfs_error(), bytes_appended()); + return _st; } } @@ -238,10 +274,11 @@ Status HdfsFileWriter::close() { TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile", Status::InternalError("failed to close hdfs file")); if (ret != 0) { - return Status::InternalError( + _st = Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), bytes_appended()); + return _st; } hdfs_file_created_total << 1; return Status::OK(); @@ -368,7 +405,7 @@ Status HdfsFileWriter::_append(std::string_view content) { } Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { - if (_closed) [[unlikely]] { + if (_close_state != FileWriterState::OPEN) [[unlikely]] { return Status::InternalError("append to closed file: {}", _path.native()); } @@ -378,34 +415,6 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { return Status::OK(); } -// Call this method when there is no more data to write. -Status HdfsFileWriter::finalize() { - if (_closed) [[unlikely]] { - return Status::InternalError("finalize closed file: {}, file_size={}", _path.native(), - bytes_appended()); - } - if (_batch_buffer.size() != 0) { - RETURN_IF_ERROR(_flush_buffer()); - } - - // Flush buffered data to HDFS without waiting for HDFS response - int ret; - { - SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); - ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file), - "HdfsFileWriter::finalize::hdfsFlush"); - } - TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush", - Status::InternalError("failed to flush hdfs file")); - if (ret != 0) { - return Status::InternalError( - "failed to flush hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, - _path.native(), hdfs_error(), bytes_appended()); - } - - return Status::OK(); -} - Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler, const std::string& fs_name, const FileWriterOptions* opts) { diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 4ed30c5a856..c5393042185 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -29,6 +29,7 @@ namespace io { class HdfsHandler; class BlockFileCache; struct FileCacheAllocatorBuilder; +struct AsyncCloseStatusPack; class HdfsFileWriter final : public FileWriter { public: @@ -44,18 +45,19 @@ public: std::string fs_name, const FileWriterOptions* opts = nullptr); ~HdfsFileWriter() override; - Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; const Path& path() const override { return _path; } size_t bytes_appended() const override { return _bytes_appended; } - bool closed() const override { return _closed; } + FileWriterState closed() const override { return _close_state; } + + Status close(bool non_block = false) override; FileCacheAllocatorBuilder* cache_builder() const override { return _cache_builder == nullptr ? nullptr : _cache_builder.get(); } private: + Status _close_impl(); // Flush buffered data into HDFS client and write local file cache if enabled // **Notice**: this would clear the underlying buffer Status _flush_buffer(); @@ -70,7 +72,6 @@ private: hdfsFile _hdfs_file = nullptr; std::string _fs_name; size_t _bytes_appended = 0; - bool _closed = false; bool _sync_file_data; std::unique_ptr<FileCacheAllocatorBuilder> _cache_builder; // nullptr if disable write file cache @@ -90,6 +91,11 @@ private: }; BatchBuffer _batch_buffer; size_t _approximate_jni_buffer_size = 0; + std::unique_ptr<AsyncCloseStatusPack> _async_close_pack; + // We should make sure that close_impl's return value is consistent + // So we need add one field to restore the value first time return by calling close_impl + Status _st; + FileWriterState _close_state {FileWriterState::OPEN}; }; } // namespace io diff --git a/be/src/io/fs/local_file_writer.cpp b/be/src/io/fs/local_file_writer.cpp index 2e4a970d6c2..4a5da18e5de 100644 --- a/be/src/io/fs/local_file_writer.cpp +++ b/be/src/io/fs/local_file_writer.cpp @@ -36,6 +36,7 @@ #include "common/sync_point.h" #include "gutil/macros.h" #include "io/fs/err_utils.h" +#include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "olap/data_dir.h" @@ -78,7 +79,7 @@ size_t LocalFileWriter::bytes_appended() const { } LocalFileWriter::~LocalFileWriter() { - if (!_closed) { + if (_close_state == FileWriterState::OPEN) { _abort(); } DorisMetrics::instance()->local_file_open_writing->increment(-1); @@ -86,7 +87,25 @@ LocalFileWriter::~LocalFileWriter() { DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended); } -Status LocalFileWriter::close() { +Status LocalFileWriter::close(bool non_block) { + if (_close_state == FileWriterState::CLOSED) { + return Status::InternalError("LocalFileWriter already closed, file path {}", + _path.native()); + } + if (_close_state == FileWriterState::ASYNC_CLOSING) { + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + // Actucally the first time call to close(true) would return the value of _finalize, if it returned one + // error status then the code would never call the second close(true) + _close_state = FileWriterState::CLOSED; + return Status::OK(); + } + if (non_block) { + _close_state = FileWriterState::ASYNC_CLOSING; + } else { + _close_state = FileWriterState::CLOSED; + } return _close(_sync_data); } @@ -104,7 +123,7 @@ void LocalFileWriter::_abort() { Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) { TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::appendv", Status::IOError("inject io error")); - if (_closed) [[unlikely]] { + if (_close_state != FileWriterState::OPEN) [[unlikely]] { return Status::InternalError("append to closed file: {}", _path.native()); } _dirty = true; @@ -159,10 +178,11 @@ Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) { return Status::OK(); } -Status LocalFileWriter::finalize() { +// TODO(ByteYue): Refactor this function as FileWriter::flush() +Status LocalFileWriter::_finalize() { TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::finalize", Status::IOError("inject io error")); - if (_closed) [[unlikely]] { + if (_close_state == FileWriterState::OPEN) [[unlikely]] { return Status::InternalError("finalize closed file: {}", _path.native()); } @@ -178,39 +198,42 @@ Status LocalFileWriter::finalize() { } Status LocalFileWriter::_close(bool sync) { - if (_closed) { - return Status::OK(); - } + auto fd_reclaim_func = [&](Status st) { + if (_fd > 9 && 0 != ::close(_fd)) { + return localfs_error(errno, fmt::format("failed to {}, along with failed to close {}", + st, _path.native())); + } + _fd = -1; + return st; + }; if (sync) { if (_dirty) { #ifdef __APPLE__ if (fcntl(_fd, F_FULLFSYNC) < 0) [[unlikely]] { - return localfs_error(errno, fmt::format("failed to sync {}", _path.native())); + return fd_reclaim_func( + localfs_error(errno, fmt::format("failed to sync {}", _path.native()))); } #else if (0 != ::fdatasync(_fd)) [[unlikely]] { - return localfs_error(errno, fmt::format("failed to sync {}", _path.native())); + return fd_reclaim_func( + localfs_error(errno, fmt::format("failed to sync {}", _path.native()))); } #endif _dirty = false; } - RETURN_IF_ERROR(sync_dir(_path.parent_path())); + RETURN_IF_ERROR(fd_reclaim_func(sync_dir(_path.parent_path()))); } - if (0 != ::close(_fd)) { - return localfs_error(errno, fmt::format("failed to close {}", _path.native())); - } - _closed = true; - DBUG_EXECUTE_IF("LocalFileWriter.close.failed", { // spare '.testfile' to make bad disk checker happy if (_path.filename().compare(kTestFilePath)) { - return Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno)); + return fd_reclaim_func( + Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno))); } }); TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::close", Status::IOError("inject io error")); - return Status::OK(); + return fd_reclaim_func(Status::OK()); } } // namespace doris::io diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h index 81ebb0ebd1f..d80065825d2 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/io/fs/local_file_writer.h @@ -31,26 +31,26 @@ public: LocalFileWriter(Path path, int fd, bool sync_data = true); ~LocalFileWriter() override; - Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; const Path& path() const override { return _path; } size_t bytes_appended() const override; - bool closed() const override { return _closed; } + FileWriterState closed() const override { return _close_state; } FileCacheAllocatorBuilder* cache_builder() const override { return nullptr; } + Status close(bool non_block = false) override; + private: + Status _finalize(); void _abort(); Status _close(bool sync); -private: Path _path; int _fd; // owned bool _dirty = false; - bool _closed = false; const bool _sync_data = true; size_t _bytes_appended = 0; + FileWriterState _close_state {FileWriterState::OPEN}; }; } // namespace doris::io diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 952377314a1..7c3af1fa6fb 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -41,6 +41,7 @@ #include <glog/logging.h> #include <sstream> +#include <tuple> #include <utility> #include "common/config.h" @@ -54,6 +55,7 @@ #include "io/fs/path.h" #include "io/fs/s3_file_bufferpool.h" #include "io/fs/s3_file_system.h" +#include "runtime/exec_env.h" #include "util/bvar_helper.h" #include "util/debug_points.h" #include "util/defer_op.h" @@ -65,7 +67,6 @@ namespace Aws::S3::Model { class DeleteObjectRequest; } // namespace Aws::S3::Model -using Aws::S3::Model::AbortMultipartUploadRequest; using Aws::S3::Model::CompletedPart; using Aws::S3::Model::CompletedMultipartUpload; using Aws::S3::Model::CompleteMultipartUploadRequest; @@ -102,11 +103,13 @@ S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin } S3FileWriter::~S3FileWriter() { - if (!closed() || _failed) { - // if we don't abort multi part upload, the uploaded part in object - // store will not automatically reclaim itself, it would cost more money - static_cast<void>(_abort()); - } else { + if (_async_close_pack != nullptr) { + // For thread safety + std::ignore = _async_close_pack->promise.get_future(); + _async_close_pack = nullptr; + } + // We won't do S3 abort operation in BE, we let s3 service do it own. + if (closed() == FileWriterState::OPEN && !_failed) { s3_bytes_written_total << _bytes_appended; } s3_file_being_written << -1; @@ -153,48 +156,39 @@ void S3FileWriter::_wait_until_finish(std::string_view task_name) { } } -Status S3FileWriter::_abort() { - // make all pending work early quits - _failed = true; - - // we need to reclaim the memory - if (_pending_buf) { - _pending_buf = nullptr; +Status S3FileWriter::close(bool non_block) { + if (closed() == FileWriterState::CLOSED) { + return Status::InternalError("S3FileWriter already closed, file path {}, file key {}", + _path.native(), _key); } - LOG(INFO) << "S3FileWriter::abort, path: " << _path.native(); - // upload id is empty means there was no create multi upload - if (_upload_id.empty()) { - return Status::OK(); + if (closed() == FileWriterState::ASYNC_CLOSING) { + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + CHECK(_async_close_pack != nullptr); + _st = _async_close_pack->future.get(); + _async_close_pack = nullptr; + // We should wait for all the pre async task to be finished + _close_state = FileWriterState::CLOSED; + // The next time we call close() with no matter non_block true or false, it would always return the + // '_st' value because this writer is already closed. + return _st; } - _wait_until_finish("Abort"); - AbortMultipartUploadRequest request; - request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); - SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); - auto outcome = _client->AbortMultipartUpload(request); - if (outcome.IsSuccess() || - outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD || - outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { - LOG(INFO) << "Abort multipart upload successfully" - << "bucket=" << _bucket << ", key=" << _path.native() - << ", upload_id=" << _upload_id << ", whole parts=" << _dump_completed_part(); - return Status::OK(); + if (non_block) { + _close_state = FileWriterState::ASYNC_CLOSING; + _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); + _async_close_pack->future = _async_close_pack->promise.get_future(); + return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func( + [&]() { _async_close_pack->promise.set_value(_close_impl()); }); } - return s3fs_error( - outcome.GetError(), - fmt::format("failed to abort multipart upload {} upload_id={}, whole parts={}", - _path.native(), _upload_id, _dump_completed_part())); + _st = _close_impl(); + _close_state = FileWriterState::CLOSED; + return _st; } -Status S3FileWriter::close() { - if (closed()) { - _wait_until_finish("close"); - return _st; - } - +Status S3FileWriter::_close_impl() { VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); - Defer defer {[this] { _closed = true; }}; - if (_upload_id.empty() && _pending_buf) { RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); } @@ -248,7 +242,7 @@ Status S3FileWriter::close() { } Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { - if (closed()) [[unlikely]] { + if (closed() != FileWriterState::OPEN) [[unlikely]] { return Status::InternalError("append to closed file: {}", _path.native()); } @@ -457,27 +451,6 @@ Status S3FileWriter::_complete() { return Status::OK(); } -Status S3FileWriter::finalize() { - if (closed()) [[unlikely]] { - return Status::InternalError("finalize closed file: {}", _path.native()); - } - - DBUG_EXECUTE_IF("s3_file_writer::finalize", - { return Status::IOError("failed to finalize due to injected error"); }); - // submit pending buf if it's not nullptr - // it's the last buf, we can submit it right now - if (_pending_buf != nullptr) { - if (_upload_id.empty()) { - RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); - } - _countdown_event.add_count(); - RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); - _pending_buf = nullptr; - } - _wait_until_finish("finalize"); - return _st; -} - Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); DCHECK(buf != nullptr); @@ -497,7 +470,7 @@ Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { } void S3FileWriter::_put_object(UploadFileBuffer& buf) { - DCHECK(!closed()); + DCHECK(closed() != FileWriterState::CLOSED) << fmt::format("state is {}", closed()); Aws::S3::Model::PutObjectRequest request; request.WithBucket(_bucket).WithKey(_key); Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream())); diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 9f72b02da60..b7d39a48245 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -42,6 +42,7 @@ struct S3Conf; namespace io { struct S3FileBuffer; class S3FileSystem; +struct AsyncCloseStatusPack; class S3FileWriter final : public FileWriter { public: @@ -49,14 +50,11 @@ public: const FileWriterOptions* opts); ~S3FileWriter() override; - Status close() override; - Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; const Path& path() const override { return _path; } size_t bytes_appended() const override { return _bytes_appended; } - bool closed() const override { return _closed; } + FileWriterState closed() const override { return _close_state; } FileCacheAllocatorBuilder* cache_builder() const override { return _cache_builder == nullptr ? nullptr : _cache_builder.get(); @@ -70,7 +68,10 @@ public: const std::string& bucket() const { return _bucket; } const std::string& upload_id() const { return _upload_id; } + Status close(bool non_block = false) override; + private: + Status _close_impl(); Status _abort(); [[nodiscard]] std::string _dump_completed_part() const; void _wait_until_finish(std::string_view task_name); @@ -97,7 +98,6 @@ private: std::atomic_bool _failed = false; - bool _closed = false; Status _st; size_t _bytes_appended = 0; @@ -113,6 +113,8 @@ private: // Because hive committers have best-effort semantics, // this shortens the inconsistent time window. bool _used_by_s3_committer; + std::unique_ptr<AsyncCloseStatusPack> _async_close_pack; + FileWriterState _close_state {FileWriterState::OPEN}; }; } // namespace io diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index faac2bf55d9..4555a9c8c4b 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -111,7 +111,29 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { return Status::OK(); } -Status StreamSinkFileWriter::finalize() { +Status StreamSinkFileWriter::close(bool non_block) { + if (_close_state == FileWriterState::CLOSED) { + return Status::InternalError("StreamSinkFileWriter already closed, load id {}", + print_id(_load_id)); + } + if (_close_state == FileWriterState::ASYNC_CLOSING) { + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + // Actucally the first time call to close(true) would return the value of _finalize, if it returned one + // error status then the code would never call the second close(true) + _close_state = FileWriterState::CLOSED; + return Status::OK(); + } + if (non_block) { + _close_state = FileWriterState::ASYNC_CLOSING; + } else { + _close_state = FileWriterState::CLOSED; + } + return _finalize(); +} + +Status StreamSinkFileWriter::_finalize() { VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat @@ -144,9 +166,4 @@ Status StreamSinkFileWriter::finalize() { return Status::OK(); } -Status StreamSinkFileWriter::close() { - _closed = true; - return Status::OK(); -} - } // namespace doris::io diff --git a/be/src/io/fs/stream_sink_file_writer.h b/be/src/io/fs/stream_sink_file_writer.h index 4a0eb955c26..9769f96331b 100644 --- a/be/src/io/fs/stream_sink_file_writer.h +++ b/be/src/io/fs/stream_sink_file_writer.h @@ -44,13 +44,9 @@ public: Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; - - Status close() override; - size_t bytes_appended() const override { return _bytes_appended; } - bool closed() const override { return _closed; } + FileWriterState closed() const override { return _close_state; } // FIXME(plat1ko): Maybe it's an inappropriate abstraction? const Path& path() const override { @@ -60,7 +56,10 @@ public: FileCacheAllocatorBuilder* cache_builder() const override { return nullptr; } + Status close(bool non_block = false) override; + private: + Status _finalize(); std::vector<std::shared_ptr<LoadStreamStub>> _streams; PUniqueId _load_id; @@ -68,8 +67,8 @@ private: int64_t _index_id; int64_t _tablet_id; int32_t _segment_id; - bool _closed = false; size_t _bytes_appended = 0; + FileWriterState _close_state {FileWriterState::OPEN}; }; } // namespace io diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 8085fc70ebc..ef3539fe0c5 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -130,7 +130,9 @@ Status SegmentFileCollection::close() { } for (auto&& [_, writer] : _file_writers) { - RETURN_IF_ERROR(writer->close()); + if (writer->closed() != io::FileWriterState::CLOSED) { + RETURN_IF_ERROR(writer->close()); + } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index c4936e9c453..1c6e75aa97e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -317,15 +317,7 @@ void DorisFSDirectory::FSIndexOutput::close() { _CLTHROWA(err.number(), err.what()); } if (_writer) { - Status ret = _writer->finalize(); - DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error", - { ret = Status::Error<INTERNAL_ERROR>("writer finalize status error"); }) - if (!ret.ok()) { - LOG(WARNING) << "FSIndexOutput close, file writer finalize error: " << ret.to_string(); - _writer.reset(nullptr); - _CLTHROWA(CL_ERR_IO, ret.to_string().c_str()); - } - ret = _writer->close(); + auto ret = _writer->close(); DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error", { ret = Status::Error<INTERNAL_ERROR>("writer close status error"); }) if (!ret.ok()) { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 10ce2137b9c..939c504580f 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -1076,7 +1076,7 @@ Status SegmentWriter::finalize_columns_index(uint64_t* index_size) { Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) { RETURN_IF_ERROR(_write_footer()); // finish - RETURN_IF_ERROR(_file_writer->finalize()); + RETURN_IF_ERROR(_file_writer->close(true)); *segment_file_size = _file_writer->bytes_appended(); if (*segment_file_size == 0) { return Status::Corruption("Bad segment, file size = 0"); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index b0b24a79c0a..7466f7861c5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -938,7 +938,7 @@ Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) { Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) { RETURN_IF_ERROR(_write_footer()); // finish - RETURN_IF_ERROR(_file_writer->finalize()); + RETURN_IF_ERROR(_file_writer->close(true)); *segment_file_size = _file_writer->bytes_appended(); if (*segment_file_size == 0) { return Status::Corruption("Bad segment, file size = 0"); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 860886bc9b7..2907c4b7a9b 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -434,6 +434,7 @@ private: std::unique_ptr<ThreadPool> _base_compaction_thread_pool; std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool; std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool; + std::unique_ptr<ThreadPool> _seg_compaction_thread_pool; std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool; diff --git a/be/src/pipeline/pipeline_tracing.cpp b/be/src/pipeline/pipeline_tracing.cpp index 047e3c3a01d..d9029951682 100644 --- a/be/src/pipeline/pipeline_tracing.cpp +++ b/be/src/pipeline/pipeline_tracing.cpp @@ -116,7 +116,6 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id) { THROW_IF_ERROR(writer.appendv(&text, 1)); } - THROW_IF_ERROR(writer.finalize()); THROW_IF_ERROR(writer.close()); _last_dump_time = MonotonicSeconds(); @@ -156,7 +155,6 @@ void PipelineTracerContext::_dump_timeslice() { THROW_IF_ERROR(writer.appendv(&text, 1)); } } - THROW_IF_ERROR(writer.finalize()); THROW_IF_ERROR(writer.close()); _last_dump_time = MonotonicSeconds(); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 0f6aa71c974..24877103384 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -197,6 +197,7 @@ public: ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); } + ThreadPool* non_block_close_thread_pool(); Status init_pipeline_task_scheduler(); void init_file_cache_factory(); @@ -368,6 +369,7 @@ private: std::unique_ptr<ThreadPool> _join_node_thread_pool; // Pool to use a new thread to release object std::unique_ptr<ThreadPool> _lazy_release_obj_pool; + std::unique_ptr<ThreadPool> _non_block_close_thread_pool; FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _without_group_task_scheduler = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ba0473ee10c..8d654c8d09b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -111,6 +111,23 @@ #include "runtime/memory/tcmalloc_hook.h" #endif +// Used for unit test +namespace { +std::once_flag flag; +std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool; +void init_threadpool_for_test() { + static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool") + .set_min_threads(12) + .set_max_threads(48) + .build(&non_block_close_thread_pool)); +} + +[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() { + std::call_once(flag, init_threadpool_for_test); + return non_block_close_thread_pool.get(); +} +} // namespace + namespace doris { class PBackendService_Stub; class PFunctionService_Stub; @@ -153,6 +170,14 @@ static pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) { return {min_num, max_num}; } +ThreadPool* ExecEnv::non_block_close_thread_pool() { +#ifdef BE_TEST + return get_non_block_close_thread_pool(); +#else + return _non_block_close_thread_pool.get(); +#endif +} + Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths, const std::vector<StorePath>& spill_store_paths, const std::set<std::string>& broken_paths) { @@ -235,6 +260,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, .set_max_threads(1) .set_max_queue_size(1000000) .build(&_lazy_release_obj_pool)); + static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool") + .set_min_threads(config::min_nonblock_close_thread_num) + .set_max_threads(config::max_nonblock_close_thread_num) + .build(&_non_block_close_thread_pool)); // NOTE: runtime query statistics mgr could be visited by query and daemon thread // so it should be created before all query begin and deleted after all query and daemon thread stoppped @@ -639,6 +668,7 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_s3_file_upload_thread_pool); SAFE_SHUTDOWN(_join_node_thread_pool); SAFE_SHUTDOWN(_lazy_release_obj_pool); + SAFE_SHUTDOWN(_non_block_close_thread_pool); SAFE_SHUTDOWN(_send_report_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); @@ -684,6 +714,7 @@ void ExecEnv::destroy() { // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? _join_node_thread_pool.reset(nullptr); _lazy_release_obj_pool.reset(nullptr); + _non_block_close_thread_pool.reset(nullptr); _send_report_thread_pool.reset(nullptr); _send_table_stats_thread_pool.reset(nullptr); _buffered_reader_prefetch_thread_pool.reset(nullptr); diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 535fbf772c9..32503bbfdb7 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -176,7 +176,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st if (file_writer == nullptr) { return Status::Corruption("add_segment failed, file writer {} is destoryed", segid); } - if (!file_writer->closed()) { + if (file_writer->closed() != io::FileWriterState::CLOSED) { return Status::Corruption("add_segment failed, segment {} is not closed", file_writer->path().native()); } @@ -209,7 +209,7 @@ Status LoadStreamWriter::close() { } for (const auto& writer : _segment_file_writers) { - if (!writer->closed()) { + if (writer->closed() != io::FileWriterState::CLOSED) { return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed", writer->path().native()); } diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 362df33aa28..919680f1c16 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -136,7 +136,6 @@ public: } std::string data(1, '0'); ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok()); - ASSERT_TRUE(writer->finalize().ok()); ASSERT_TRUE(writer->close().ok()); } ExecEnv::GetInstance()->_file_cache_factory = factory.get(); diff --git a/be/test/io/fs/hdfs_file_system_test.cpp b/be/test/io/fs/hdfs_file_system_test.cpp index 87fe5d52dd8..41c26ccedf7 100644 --- a/be/test/io/fs/hdfs_file_system_test.cpp +++ b/be/test/io/fs/hdfs_file_system_test.cpp @@ -110,9 +110,6 @@ TEST(HdfsFileSystemTest, Write) { st = hdfs_file_writer->append(content_2M); ASSERT_TRUE(st.ok()) << st; - st = hdfs_file_writer->finalize(); - ASSERT_TRUE(st.ok()) << st; - st = hdfs_file_writer->close(); ASSERT_TRUE(st.ok()) << st; diff --git a/be/test/io/fs/local_file_system_test.cpp b/be/test/io/fs/local_file_system_test.cpp index 556fb1adecc..a264ca799dd 100644 --- a/be/test/io/fs/local_file_system_test.cpp +++ b/be/test/io/fs/local_file_system_test.cpp @@ -91,8 +91,6 @@ TEST_F(LocalFileSystemTest, WriteRead) { Slice slices[2] {abc, bcd}; st = file_writer->appendv(slices, 2); ASSERT_TRUE(st.ok()) << st; - st = file_writer->finalize(); - ASSERT_TRUE(st.ok()) << st; st = file_writer->close(); ASSERT_TRUE(st.ok()) << st; ASSERT_EQ(file_writer->bytes_appended(), 115); @@ -143,8 +141,6 @@ TEST_F(LocalFileSystemTest, Exist) { io::FileWriterPtr file_writer; auto st = io::global_local_filesystem()->create_file(fname, &file_writer); ASSERT_TRUE(st.ok()) << st; - st = file_writer->finalize(); - ASSERT_TRUE(st.ok()) << st; st = file_writer->close(); ASSERT_TRUE(st.ok()) << st; ASSERT_TRUE(check_exist(fname)); @@ -155,8 +151,6 @@ TEST_F(LocalFileSystemTest, List) { auto fname = fmt::format("{}/abc", test_dir); auto st = io::global_local_filesystem()->create_file(fname, &file_writer); ASSERT_TRUE(st.ok()) << st; - st = file_writer->finalize(); - ASSERT_TRUE(st.ok()) << st; st = file_writer->close(); ASSERT_TRUE(st.ok()) << st; ASSERT_TRUE(check_exist(fname)); diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 7d7d71ab318..9e0923d7d53 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -123,7 +123,6 @@ TEST_F(S3FileWriterTest, multi_part_io_error) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(!s3_file_writer->finalize().ok()); // The second part would fail uploading itself to s3 // so the result of close should be not ok ASSERT_TRUE(!s3_file_writer->close().ok()); @@ -166,7 +165,6 @@ TEST_F(S3FileWriterTest, put_object_io_error) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(!s3_file_writer->finalize().ok()); // The object might be timeout but still succeed in loading ASSERT_TRUE(!s3_file_writer->close().ok()); } @@ -266,7 +264,6 @@ TEST_F(S3FileWriterTest, normal) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(s3_file_writer->finalize().ok()); ASSERT_EQ(Status::OK(), s3_file_writer->close()); int64_t s3_file_size = 0; ASSERT_EQ(Status::OK(), s3_fs->file_size("normal", &s3_file_size)); @@ -299,7 +296,6 @@ TEST_F(S3FileWriterTest, smallFile) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(s3_file_writer->finalize().ok()); ASSERT_EQ(Status::OK(), s3_file_writer->close()); int64_t s3_file_size = 0; ASSERT_EQ(Status::OK(), s3_fs->file_size("small", &s3_file_size)); @@ -359,7 +355,6 @@ TEST_F(S3FileWriterTest, finalize_error) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(!s3_file_writer->finalize().ok()); bool exits = false; static_cast<void>(s3_fs->exists("finalize_error", &exits)); ASSERT_TRUE(!exits); @@ -396,7 +391,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(s3_file_writer->finalize().ok()); // The second part would fail uploading itself to s3 // so the result of close should be not ok auto st = s3_file_writer->close(); @@ -435,7 +429,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(s3_file_writer->finalize().ok()); // The second part would fail uploading itself to s3 // so the result of close should be not ok auto st = s3_file_writer->close(); @@ -474,7 +467,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) { offset += bytes_read; } ASSERT_EQ(s3_file_writer->bytes_appended(), file_size); - ASSERT_TRUE(s3_file_writer->finalize().ok()); // The second part would fail uploading itself to s3 // so the result of close should be not ok auto st = s3_file_writer->close(); diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index ad6e496c56f..b9b0e0818cf 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -106,7 +106,7 @@ TEST_F(StreamSinkFileWriterTest, Test) { CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size())); EXPECT_EQ(NUM_STREAM, g_num_request); - CHECK_STATUS_OK(writer.finalize()); + CHECK_STATUS_OK(writer.close()); EXPECT_EQ(NUM_STREAM * 2, g_num_request); } diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 49de1826104..953be73862d 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -99,15 +99,13 @@ public: ~FileWriterMock() override = default; - Status close() override { return _local_file_writer->close(); } + Status close(bool /*non_block*/) override { return _local_file_writer->close(); } Status appendv(const Slice* data, size_t data_cnt) override { return _local_file_writer->appendv(data, data_cnt); } - Status finalize() override { return _local_file_writer->finalize(); } - - bool closed() const override { return _local_file_writer->closed(); } + io::FileWriterState closed() const override { return _local_file_writer->closed(); } size_t bytes_appended() const override { return _local_file_writer->bytes_appended(); } diff --git a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy index 759c409a850..4a7e3d45e90 100644 --- a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy @@ -103,13 +103,6 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") { GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") } qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" try { GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org