This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd0bdf3ba8 [feature](IO) Replace file writer's finalize function with 
async close (#34679)
3cd0bdf3ba8 is described below

commit 3cd0bdf3ba8c002ae092b3d87bd766cb18e85d21
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Mon May 13 16:44:58 2024 +0800

    [feature](IO) Replace file writer's finalize function with async close 
(#34679)
---
 be/src/common/config.cpp                           |   5 +
 be/src/common/config.h                             |   5 +
 be/src/io/fs/broker_file_writer.cpp                |  30 ++++--
 be/src/io/fs/broker_file_writer.h                  |   8 +-
 be/src/io/fs/file_writer.h                         |  20 ++--
 be/src/io/fs/hdfs_file_writer.cpp                  |  81 +++++++++--------
 be/src/io/fs/hdfs_file_writer.h                    |  14 ++-
 be/src/io/fs/local_file_writer.cpp                 |  59 ++++++++----
 be/src/io/fs/local_file_writer.h                   |  10 +-
 be/src/io/fs/s3_file_writer.cpp                    | 101 ++++++++-------------
 be/src/io/fs/s3_file_writer.h                      |  12 ++-
 be/src/io/fs/stream_sink_file_writer.cpp           |  29 ++++--
 be/src/io/fs/stream_sink_file_writer.h             |  11 +--
 be/src/olap/rowset/beta_rowset_writer.cpp          |   4 +-
 .../segment_v2/inverted_index_fs_directory.cpp     |  10 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   2 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   2 +-
 be/src/olap/storage_engine.h                       |   1 +
 be/src/pipeline/pipeline_tracing.cpp               |   2 -
 be/src/runtime/exec_env.h                          |   2 +
 be/src/runtime/exec_env_init.cpp                   |  31 +++++++
 be/src/runtime/load_stream_writer.cpp              |   4 +-
 be/test/io/cache/block_file_cache_test.cpp         |   1 -
 be/test/io/fs/hdfs_file_system_test.cpp            |   3 -
 be/test/io/fs/local_file_system_test.cpp           |   6 --
 be/test/io/fs/s3_file_writer_test.cpp              |   8 --
 be/test/io/fs/stream_sink_file_writer_test.cpp     |   2 +-
 be/test/olap/tablet_cooldown_test.cpp              |   6 +-
 ...index_compound_directory_fault_injection.groovy |   7 --
 29 files changed, 268 insertions(+), 208 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8055e3fe456..4ec90cfa886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1263,6 +1263,11 @@ DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300");
 // The max retry times when hdfs write failed
 DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3");
 
+// The min thread num for NonBlockCloseThreadPool
+DEFINE_Int64(min_nonblock_close_thread_num, "12");
+// The max thread num for NonBlockCloseThreadPool
+DEFINE_Int64(max_nonblock_close_thread_num, "64");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 46dcdb8658b..7ff833513eb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1337,6 +1337,11 @@ DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds);
 // The max retry times when hdfs write failed
 DECLARE_mInt64(hdfs_jni_write_max_retry_time);
 
+// The min thread num for NonBlockCloseThreadPool
+DECLARE_Int64(min_nonblock_close_thread_num);
+// The max thread num for NonBlockCloseThreadPool
+DECLARE_Int64(max_nonblock_close_thread_num);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/broker_file_writer.cpp 
b/be/src/io/fs/broker_file_writer.cpp
index f162b02b19e..2e5af74090a 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -28,6 +28,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "io/fs/file_writer.h"
 #include "runtime/broker_mgr.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
@@ -60,12 +61,29 @@ inline const std::string& client_id(ExecEnv* env, const 
TNetworkAddress& addr) {
 }
 #endif
 
-Status BrokerFileWriter::close() {
-    if (_closed) {
+Status BrokerFileWriter::close(bool non_block) {
+    if (_close_state == FileWriterState::CLOSED) {
+        return Status::InternalError("BrokerFileWriter already closed, file 
path {}",
+                                     _path.native());
+    }
+    if (_close_state == FileWriterState::ASYNC_CLOSING) {
+        if (non_block) {
+            return Status::InternalError("Don't submit async close multi 
times");
+        }
+        // Actucally the first time call to close(true) would return the value 
of _finalize, if it returned one
+        // error status then the code would never call the second close(true)
+        _close_state = FileWriterState::CLOSED;
         return Status::OK();
     }
-    _closed = true;
+    if (non_block) {
+        _close_state = FileWriterState::ASYNC_CLOSING;
+    } else {
+        _close_state = FileWriterState::CLOSED;
+    }
+    return _close_impl();
+}
 
+Status BrokerFileWriter::_close_impl() {
     TBrokerCloseWriterRequest request;
     request.__set_version(TBrokerVersion::VERSION_ONE);
     request.__set_fd(_fd);
@@ -117,7 +135,7 @@ Status BrokerFileWriter::close() {
 }
 
 Status BrokerFileWriter::appendv(const Slice* data, size_t data_cnt) {
-    if (_closed) [[unlikely]] {
+    if (_close_state != FileWriterState::OPEN) [[unlikely]] {
         return Status::InternalError("append to closed file: {}", 
_path.native());
     }
 
@@ -135,10 +153,6 @@ Status BrokerFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
     return Status::OK();
 }
 
-Status BrokerFileWriter::finalize() {
-    return Status::OK();
-}
-
 Result<FileWriterPtr> BrokerFileWriter::create(ExecEnv* env, const 
TNetworkAddress& broker_address,
                                                const std::map<std::string, 
std::string>& properties,
                                                Path path) {
diff --git a/be/src/io/fs/broker_file_writer.h 
b/be/src/io/fs/broker_file_writer.h
index d6fce52a05c..ad065a47327 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -43,17 +43,17 @@ public:
 
     BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, Path 
path, TBrokerFD fd);
     ~BrokerFileWriter() override;
+    Status close(bool non_block = false) override;
 
-    Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
     const Path& path() const override { return _path; }
     size_t bytes_appended() const override { return _cur_offset; }
-    bool closed() const override { return _closed; }
+    FileWriterState closed() const override { return _close_state; }
     FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
 
 private:
     Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes);
+    Status _close_impl();
 
 private:
     ExecEnv* _env = nullptr;
@@ -61,7 +61,7 @@ private:
     Path _path;
     size_t _cur_offset = 0;
     TBrokerFD _fd;
-    bool _closed = false;
+    FileWriterState _close_state {FileWriterState::OPEN};
 };
 
 } // end namespace io
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index ad13f7f0095..56b7f731c51 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <future>
 #include <memory>
 
 #include "common/status.h"
@@ -46,6 +47,17 @@ struct FileWriterOptions {
     uint64_t file_cache_expiration = 0; // Absolute time
 };
 
+struct AsyncCloseStatusPack {
+    std::promise<Status> promise;
+    std::future<Status> future;
+};
+
+enum class FileWriterState : uint8_t {
+    OPEN = 0,
+    ASYNC_CLOSING,
+    CLOSED,
+};
+
 class FileWriter {
 public:
     FileWriter() = default;
@@ -56,21 +68,17 @@ public:
 
     // Normal close. Wait for all data to persist before returning.
     // If there is no data appended, an empty file will be persisted.
-    virtual Status close() = 0;
+    virtual Status close(bool non_block = false) = 0;
 
     Status append(const Slice& data) { return appendv(&data, 1); }
 
     virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
 
-    // Call this method when there is no more data to write.
-    // FIXME(cyx): Does not seem to be an appropriate interface for file 
system?
-    virtual Status finalize() = 0;
-
     virtual const Path& path() const = 0;
 
     virtual size_t bytes_appended() const = 0;
 
-    virtual bool closed() const = 0;
+    virtual FileWriterState closed() const = 0;
 
     virtual FileCacheAllocatorBuilder* cache_builder() const = 0;
 };
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index a37bd323984..7a499239bd0 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -36,8 +36,10 @@
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/file_cache_common.h"
 #include "io/fs/err_utils.h"
+#include "io/fs/file_writer.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/hdfs_util.h"
+#include "runtime/exec_env.h"
 #include "service/backend_options.h"
 #include "util/bvar_helper.h"
 #include "util/jni-util.h"
@@ -142,6 +144,11 @@ HdfsFileWriter::HdfsFileWriter(Path path, 
std::shared_ptr<HdfsHandler> handler,
 }
 
 HdfsFileWriter::~HdfsFileWriter() {
+    if (_async_close_pack != nullptr) {
+        // For thread safety
+        std::ignore = _async_close_pack->promise.get_future();
+        _async_close_pack = nullptr;
+    }
     if (_hdfs_file) {
         SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
@@ -195,13 +202,41 @@ Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
 #endif
 }
 
-Status HdfsFileWriter::close() {
-    if (_closed) {
-        return Status::OK();
+Status HdfsFileWriter::close(bool non_block) {
+    if (closed() == FileWriterState::CLOSED) {
+        return Status::InternalError("HdfsFileWriter already closed, file path 
{}, fs name {}",
+                                     _path.native(), _fs_name);
+    }
+    if (closed() == FileWriterState::ASYNC_CLOSING) {
+        if (non_block) {
+            return Status::InternalError("Don't submit async close multi 
times");
+        }
+        CHECK(_async_close_pack != nullptr);
+        _st = _async_close_pack->future.get();
+        _async_close_pack = nullptr;
+        // We should wait for all the pre async task to be finished
+        _close_state = FileWriterState::CLOSED;
+        // The next time we call close() with no matter non_block true or 
false, it would always return the
+        // '_st' value because this writer is already closed.
+        return _st;
+    }
+    if (non_block) {
+        _close_state = FileWriterState::ASYNC_CLOSING;
+        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
+        _async_close_pack->future = _async_close_pack->promise.get_future();
+        return 
ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
+                [&]() { _async_close_pack->promise.set_value(_close_impl()); 
});
     }
-    _closed = true;
+    _st = _close_impl();
+    _close_state = FileWriterState::CLOSED;
+    return _st;
+}
+
+Status HdfsFileWriter::_close_impl() {
     if (_batch_buffer.size() != 0) {
-        RETURN_IF_ERROR(_flush_buffer());
+        if (_st = _flush_buffer(); !_st.ok()) {
+            return _st;
+        }
     }
     int ret;
     if (_sync_file_data) {
@@ -220,9 +255,10 @@ Status HdfsFileWriter::close() {
                                                Status::InternalError("failed 
to sync hdfs file"));
 
         if (ret != 0) {
-            return Status::InternalError(
+            _st = Status::InternalError(
                     "failed to sync hdfs file. fs_name={} path={} : {}, 
file_size={}", _fs_name,
                     _path.native(), hdfs_error(), bytes_appended());
+            return _st;
         }
     }
 
@@ -238,10 +274,11 @@ Status HdfsFileWriter::close() {
     TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
                                            Status::InternalError("failed to 
close hdfs file"));
     if (ret != 0) {
-        return Status::InternalError(
+        _st = Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}, file_size={}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error(),
                 bytes_appended());
+        return _st;
     }
     hdfs_file_created_total << 1;
     return Status::OK();
@@ -368,7 +405,7 @@ Status HdfsFileWriter::_append(std::string_view content) {
 }
 
 Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
-    if (_closed) [[unlikely]] {
+    if (_close_state != FileWriterState::OPEN) [[unlikely]] {
         return Status::InternalError("append to closed file: {}", 
_path.native());
     }
 
@@ -378,34 +415,6 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
     return Status::OK();
 }
 
-// Call this method when there is no more data to write.
-Status HdfsFileWriter::finalize() {
-    if (_closed) [[unlikely]] {
-        return Status::InternalError("finalize closed file: {}, file_size={}", 
_path.native(),
-                                     bytes_appended());
-    }
-    if (_batch_buffer.size() != 0) {
-        RETURN_IF_ERROR(_flush_buffer());
-    }
-
-    // Flush buffered data to HDFS without waiting for HDFS response
-    int ret;
-    {
-        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
-        ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsFlush(_hdfs_handler->hdfs_fs, 
_hdfs_file),
-                                           
"HdfsFileWriter::finalize::hdfsFlush");
-    }
-    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush",
-                                           Status::InternalError("failed to 
flush hdfs file"));
-    if (ret != 0) {
-        return Status::InternalError(
-                "failed to flush hdfs file. fs_name={} path={} : {}, 
file_size={}", _fs_name,
-                _path.native(), hdfs_error(), bytes_appended());
-    }
-
-    return Status::OK();
-}
-
 Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, 
std::shared_ptr<HdfsHandler> handler,
                                              const std::string& fs_name,
                                              const FileWriterOptions* opts) {
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 4ed30c5a856..c5393042185 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -29,6 +29,7 @@ namespace io {
 class HdfsHandler;
 class BlockFileCache;
 struct FileCacheAllocatorBuilder;
+struct AsyncCloseStatusPack;
 
 class HdfsFileWriter final : public FileWriter {
 public:
@@ -44,18 +45,19 @@ public:
                    std::string fs_name, const FileWriterOptions* opts = 
nullptr);
     ~HdfsFileWriter() override;
 
-    Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
     const Path& path() const override { return _path; }
     size_t bytes_appended() const override { return _bytes_appended; }
-    bool closed() const override { return _closed; }
+    FileWriterState closed() const override { return _close_state; }
+
+    Status close(bool non_block = false) override;
 
     FileCacheAllocatorBuilder* cache_builder() const override {
         return _cache_builder == nullptr ? nullptr : _cache_builder.get();
     }
 
 private:
+    Status _close_impl();
     // Flush buffered data into HDFS client and write local file cache if 
enabled
     // **Notice**: this would clear the underlying buffer
     Status _flush_buffer();
@@ -70,7 +72,6 @@ private:
     hdfsFile _hdfs_file = nullptr;
     std::string _fs_name;
     size_t _bytes_appended = 0;
-    bool _closed = false;
     bool _sync_file_data;
     std::unique_ptr<FileCacheAllocatorBuilder>
             _cache_builder; // nullptr if disable write file cache
@@ -90,6 +91,11 @@ private:
     };
     BatchBuffer _batch_buffer;
     size_t _approximate_jni_buffer_size = 0;
+    std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
+    // We should make sure that close_impl's return value is consistent
+    // So we need add one field to restore the value first time return by 
calling close_impl
+    Status _st;
+    FileWriterState _close_state {FileWriterState::OPEN};
 };
 
 } // namespace io
diff --git a/be/src/io/fs/local_file_writer.cpp 
b/be/src/io/fs/local_file_writer.cpp
index 2e4a970d6c2..4a5da18e5de 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -36,6 +36,7 @@
 #include "common/sync_point.h"
 #include "gutil/macros.h"
 #include "io/fs/err_utils.h"
+#include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
 #include "olap/data_dir.h"
@@ -78,7 +79,7 @@ size_t LocalFileWriter::bytes_appended() const {
 }
 
 LocalFileWriter::~LocalFileWriter() {
-    if (!_closed) {
+    if (_close_state == FileWriterState::OPEN) {
         _abort();
     }
     DorisMetrics::instance()->local_file_open_writing->increment(-1);
@@ -86,7 +87,25 @@ LocalFileWriter::~LocalFileWriter() {
     
DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
 }
 
-Status LocalFileWriter::close() {
+Status LocalFileWriter::close(bool non_block) {
+    if (_close_state == FileWriterState::CLOSED) {
+        return Status::InternalError("LocalFileWriter already closed, file 
path {}",
+                                     _path.native());
+    }
+    if (_close_state == FileWriterState::ASYNC_CLOSING) {
+        if (non_block) {
+            return Status::InternalError("Don't submit async close multi 
times");
+        }
+        // Actucally the first time call to close(true) would return the value 
of _finalize, if it returned one
+        // error status then the code would never call the second close(true)
+        _close_state = FileWriterState::CLOSED;
+        return Status::OK();
+    }
+    if (non_block) {
+        _close_state = FileWriterState::ASYNC_CLOSING;
+    } else {
+        _close_state = FileWriterState::CLOSED;
+    }
     return _close(_sync_data);
 }
 
@@ -104,7 +123,7 @@ void LocalFileWriter::_abort() {
 Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) {
     TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::appendv",
                                       Status::IOError("inject io error"));
-    if (_closed) [[unlikely]] {
+    if (_close_state != FileWriterState::OPEN) [[unlikely]] {
         return Status::InternalError("append to closed file: {}", 
_path.native());
     }
     _dirty = true;
@@ -159,10 +178,11 @@ Status LocalFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
     return Status::OK();
 }
 
-Status LocalFileWriter::finalize() {
+// TODO(ByteYue): Refactor this function as FileWriter::flush()
+Status LocalFileWriter::_finalize() {
     TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::finalize",
                                       Status::IOError("inject io error"));
-    if (_closed) [[unlikely]] {
+    if (_close_state == FileWriterState::OPEN) [[unlikely]] {
         return Status::InternalError("finalize closed file: {}", 
_path.native());
     }
 
@@ -178,39 +198,42 @@ Status LocalFileWriter::finalize() {
 }
 
 Status LocalFileWriter::_close(bool sync) {
-    if (_closed) {
-        return Status::OK();
-    }
+    auto fd_reclaim_func = [&](Status st) {
+        if (_fd > 9 && 0 != ::close(_fd)) {
+            return localfs_error(errno, fmt::format("failed to {}, along with 
failed to close {}",
+                                                    st, _path.native()));
+        }
+        _fd = -1;
+        return st;
+    };
     if (sync) {
         if (_dirty) {
 #ifdef __APPLE__
             if (fcntl(_fd, F_FULLFSYNC) < 0) [[unlikely]] {
-                return localfs_error(errno, fmt::format("failed to sync {}", 
_path.native()));
+                return fd_reclaim_func(
+                        localfs_error(errno, fmt::format("failed to sync {}", 
_path.native())));
             }
 #else
             if (0 != ::fdatasync(_fd)) [[unlikely]] {
-                return localfs_error(errno, fmt::format("failed to sync {}", 
_path.native()));
+                return fd_reclaim_func(
+                        localfs_error(errno, fmt::format("failed to sync {}", 
_path.native())));
             }
 #endif
             _dirty = false;
         }
-        RETURN_IF_ERROR(sync_dir(_path.parent_path()));
+        RETURN_IF_ERROR(fd_reclaim_func(sync_dir(_path.parent_path())));
     }
 
-    if (0 != ::close(_fd)) {
-        return localfs_error(errno, fmt::format("failed to close {}", 
_path.native()));
-    }
-    _closed = true;
-
     DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
         // spare '.testfile' to make bad disk checker happy
         if (_path.filename().compare(kTestFilePath)) {
-            return Status::IOError("cannot close {}: {}", _path.native(), 
std::strerror(errno));
+            return fd_reclaim_func(
+                    Status::IOError("cannot close {}: {}", _path.native(), 
std::strerror(errno)));
         }
     });
 
     TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileWriter::close", 
Status::IOError("inject io error"));
-    return Status::OK();
+    return fd_reclaim_func(Status::OK());
 }
 
 } // namespace doris::io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index 81ebb0ebd1f..d80065825d2 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -31,26 +31,26 @@ public:
     LocalFileWriter(Path path, int fd, bool sync_data = true);
     ~LocalFileWriter() override;
 
-    Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
     const Path& path() const override { return _path; }
     size_t bytes_appended() const override;
-    bool closed() const override { return _closed; }
+    FileWriterState closed() const override { return _close_state; }
 
     FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
 
+    Status close(bool non_block = false) override;
+
 private:
+    Status _finalize();
     void _abort();
     Status _close(bool sync);
 
-private:
     Path _path;
     int _fd; // owned
     bool _dirty = false;
-    bool _closed = false;
     const bool _sync_data = true;
     size_t _bytes_appended = 0;
+    FileWriterState _close_state {FileWriterState::OPEN};
 };
 
 } // namespace doris::io
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 952377314a1..7c3af1fa6fb 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -41,6 +41,7 @@
 #include <glog/logging.h>
 
 #include <sstream>
+#include <tuple>
 #include <utility>
 
 #include "common/config.h"
@@ -54,6 +55,7 @@
 #include "io/fs/path.h"
 #include "io/fs/s3_file_bufferpool.h"
 #include "io/fs/s3_file_system.h"
+#include "runtime/exec_env.h"
 #include "util/bvar_helper.h"
 #include "util/debug_points.h"
 #include "util/defer_op.h"
@@ -65,7 +67,6 @@ namespace Aws::S3::Model {
 class DeleteObjectRequest;
 } // namespace Aws::S3::Model
 
-using Aws::S3::Model::AbortMultipartUploadRequest;
 using Aws::S3::Model::CompletedPart;
 using Aws::S3::Model::CompletedMultipartUpload;
 using Aws::S3::Model::CompleteMultipartUploadRequest;
@@ -102,11 +103,13 @@ 
S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
 }
 
 S3FileWriter::~S3FileWriter() {
-    if (!closed() || _failed) {
-        // if we don't abort multi part upload, the uploaded part in object
-        // store will not automatically reclaim itself, it would cost more 
money
-        static_cast<void>(_abort());
-    } else {
+    if (_async_close_pack != nullptr) {
+        // For thread safety
+        std::ignore = _async_close_pack->promise.get_future();
+        _async_close_pack = nullptr;
+    }
+    // We won't do S3 abort operation in BE, we let s3 service do it own.
+    if (closed() == FileWriterState::OPEN && !_failed) {
         s3_bytes_written_total << _bytes_appended;
     }
     s3_file_being_written << -1;
@@ -153,48 +156,39 @@ void S3FileWriter::_wait_until_finish(std::string_view 
task_name) {
     }
 }
 
-Status S3FileWriter::_abort() {
-    // make all pending work early quits
-    _failed = true;
-
-    // we need to reclaim the memory
-    if (_pending_buf) {
-        _pending_buf = nullptr;
+Status S3FileWriter::close(bool non_block) {
+    if (closed() == FileWriterState::CLOSED) {
+        return Status::InternalError("S3FileWriter already closed, file path 
{}, file key {}",
+                                     _path.native(), _key);
     }
-    LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
-    // upload id is empty means there was no create multi upload
-    if (_upload_id.empty()) {
-        return Status::OK();
+    if (closed() == FileWriterState::ASYNC_CLOSING) {
+        if (non_block) {
+            return Status::InternalError("Don't submit async close multi 
times");
+        }
+        CHECK(_async_close_pack != nullptr);
+        _st = _async_close_pack->future.get();
+        _async_close_pack = nullptr;
+        // We should wait for all the pre async task to be finished
+        _close_state = FileWriterState::CLOSED;
+        // The next time we call close() with no matter non_block true or 
false, it would always return the
+        // '_st' value because this writer is already closed.
+        return _st;
     }
-    _wait_until_finish("Abort");
-    AbortMultipartUploadRequest request;
-    request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-    auto outcome = _client->AbortMultipartUpload(request);
-    if (outcome.IsSuccess() ||
-        outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD 
||
-        outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
-        LOG(INFO) << "Abort multipart upload successfully"
-                  << "bucket=" << _bucket << ", key=" << _path.native()
-                  << ", upload_id=" << _upload_id << ", whole parts=" << 
_dump_completed_part();
-        return Status::OK();
+    if (non_block) {
+        _close_state = FileWriterState::ASYNC_CLOSING;
+        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
+        _async_close_pack->future = _async_close_pack->promise.get_future();
+        return 
ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func(
+                [&]() { _async_close_pack->promise.set_value(_close_impl()); 
});
     }
-    return s3fs_error(
-            outcome.GetError(),
-            fmt::format("failed to abort multipart upload {} upload_id={}, 
whole parts={}",
-                        _path.native(), _upload_id, _dump_completed_part()));
+    _st = _close_impl();
+    _close_state = FileWriterState::CLOSED;
+    return _st;
 }
 
-Status S3FileWriter::close() {
-    if (closed()) {
-        _wait_until_finish("close");
-        return _st;
-    }
-
+Status S3FileWriter::_close_impl() {
     VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
 
-    Defer defer {[this] { _closed = true; }};
-
     if (_upload_id.empty() && _pending_buf) {
         RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
     }
@@ -248,7 +242,7 @@ Status S3FileWriter::close() {
 }
 
 Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
-    if (closed()) [[unlikely]] {
+    if (closed() != FileWriterState::OPEN) [[unlikely]] {
         return Status::InternalError("append to closed file: {}", 
_path.native());
     }
 
@@ -457,27 +451,6 @@ Status S3FileWriter::_complete() {
     return Status::OK();
 }
 
-Status S3FileWriter::finalize() {
-    if (closed()) [[unlikely]] {
-        return Status::InternalError("finalize closed file: {}", 
_path.native());
-    }
-
-    DBUG_EXECUTE_IF("s3_file_writer::finalize",
-                    { return Status::IOError("failed to finalize due to 
injected error"); });
-    // submit pending buf if it's not nullptr
-    // it's the last buf, we can submit it right now
-    if (_pending_buf != nullptr) {
-        if (_upload_id.empty()) {
-            RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
-        }
-        _countdown_event.add_count();
-        RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
-        _pending_buf = nullptr;
-    }
-    _wait_until_finish("finalize");
-    return _st;
-}
-
 Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
     auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
     DCHECK(buf != nullptr);
@@ -497,7 +470,7 @@ Status 
S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
 }
 
 void S3FileWriter::_put_object(UploadFileBuffer& buf) {
-    DCHECK(!closed());
+    DCHECK(closed() != FileWriterState::CLOSED) << fmt::format("state is {}", 
closed());
     Aws::S3::Model::PutObjectRequest request;
     request.WithBucket(_bucket).WithKey(_key);
     Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 9f72b02da60..b7d39a48245 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -42,6 +42,7 @@ struct S3Conf;
 namespace io {
 struct S3FileBuffer;
 class S3FileSystem;
+struct AsyncCloseStatusPack;
 
 class S3FileWriter final : public FileWriter {
 public:
@@ -49,14 +50,11 @@ public:
                  const FileWriterOptions* opts);
     ~S3FileWriter() override;
 
-    Status close() override;
-
     Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
 
     const Path& path() const override { return _path; }
     size_t bytes_appended() const override { return _bytes_appended; }
-    bool closed() const override { return _closed; }
+    FileWriterState closed() const override { return _close_state; }
 
     FileCacheAllocatorBuilder* cache_builder() const override {
         return _cache_builder == nullptr ? nullptr : _cache_builder.get();
@@ -70,7 +68,10 @@ public:
     const std::string& bucket() const { return _bucket; }
     const std::string& upload_id() const { return _upload_id; }
 
+    Status close(bool non_block = false) override;
+
 private:
+    Status _close_impl();
     Status _abort();
     [[nodiscard]] std::string _dump_completed_part() const;
     void _wait_until_finish(std::string_view task_name);
@@ -97,7 +98,6 @@ private:
 
     std::atomic_bool _failed = false;
 
-    bool _closed = false;
     Status _st;
     size_t _bytes_appended = 0;
 
@@ -113,6 +113,8 @@ private:
     // Because hive committers have best-effort semantics,
     // this shortens the inconsistent time window.
     bool _used_by_s3_committer;
+    std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
+    FileWriterState _close_state {FileWriterState::OPEN};
 };
 
 } // namespace io
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index faac2bf55d9..4555a9c8c4b 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -111,7 +111,29 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
     return Status::OK();
 }
 
-Status StreamSinkFileWriter::finalize() {
+Status StreamSinkFileWriter::close(bool non_block) {
+    if (_close_state == FileWriterState::CLOSED) {
+        return Status::InternalError("StreamSinkFileWriter already closed, 
load id {}",
+                                     print_id(_load_id));
+    }
+    if (_close_state == FileWriterState::ASYNC_CLOSING) {
+        if (non_block) {
+            return Status::InternalError("Don't submit async close multi 
times");
+        }
+        // Actucally the first time call to close(true) would return the value 
of _finalize, if it returned one
+        // error status then the code would never call the second close(true)
+        _close_state = FileWriterState::CLOSED;
+        return Status::OK();
+    }
+    if (non_block) {
+        _close_state = FileWriterState::ASYNC_CLOSING;
+    } else {
+        _close_state = FileWriterState::CLOSED;
+    }
+    return _finalize();
+}
+
+Status StreamSinkFileWriter::_finalize() {
     VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
@@ -144,9 +166,4 @@ Status StreamSinkFileWriter::finalize() {
     return Status::OK();
 }
 
-Status StreamSinkFileWriter::close() {
-    _closed = true;
-    return Status::OK();
-}
-
 } // namespace doris::io
diff --git a/be/src/io/fs/stream_sink_file_writer.h 
b/be/src/io/fs/stream_sink_file_writer.h
index 4a0eb955c26..9769f96331b 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -44,13 +44,9 @@ public:
 
     Status appendv(const Slice* data, size_t data_cnt) override;
 
-    Status finalize() override;
-
-    Status close() override;
-
     size_t bytes_appended() const override { return _bytes_appended; }
 
-    bool closed() const override { return _closed; }
+    FileWriterState closed() const override { return _close_state; }
 
     // FIXME(plat1ko): Maybe it's an inappropriate abstraction?
     const Path& path() const override {
@@ -60,7 +56,10 @@ public:
 
     FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
 
+    Status close(bool non_block = false) override;
+
 private:
+    Status _finalize();
     std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 
     PUniqueId _load_id;
@@ -68,8 +67,8 @@ private:
     int64_t _index_id;
     int64_t _tablet_id;
     int32_t _segment_id;
-    bool _closed = false;
     size_t _bytes_appended = 0;
+    FileWriterState _close_state {FileWriterState::OPEN};
 };
 
 } // namespace io
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8085fc70ebc..ef3539fe0c5 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -130,7 +130,9 @@ Status SegmentFileCollection::close() {
     }
 
     for (auto&& [_, writer] : _file_writers) {
-        RETURN_IF_ERROR(writer->close());
+        if (writer->closed() != io::FileWriterState::CLOSED) {
+            RETURN_IF_ERROR(writer->close());
+        }
     }
 
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index c4936e9c453..1c6e75aa97e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -317,15 +317,7 @@ void DorisFSDirectory::FSIndexOutput::close() {
         _CLTHROWA(err.number(), err.what());
     }
     if (_writer) {
-        Status ret = _writer->finalize();
-        
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error",
-                        { ret = Status::Error<INTERNAL_ERROR>("writer finalize 
status error"); })
-        if (!ret.ok()) {
-            LOG(WARNING) << "FSIndexOutput close, file writer finalize error: 
" << ret.to_string();
-            _writer.reset(nullptr);
-            _CLTHROWA(CL_ERR_IO, ret.to_string().c_str());
-        }
-        ret = _writer->close();
+        auto ret = _writer->close();
         
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
                         { ret = Status::Error<INTERNAL_ERROR>("writer close 
status error"); })
         if (!ret.ok()) {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 10ce2137b9c..939c504580f 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1076,7 +1076,7 @@ Status SegmentWriter::finalize_columns_index(uint64_t* 
index_size) {
 Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) {
     RETURN_IF_ERROR(_write_footer());
     // finish
-    RETURN_IF_ERROR(_file_writer->finalize());
+    RETURN_IF_ERROR(_file_writer->close(true));
     *segment_file_size = _file_writer->bytes_appended();
     if (*segment_file_size == 0) {
         return Status::Corruption("Bad segment, file size = 0");
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index b0b24a79c0a..7466f7861c5 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -938,7 +938,7 @@ Status 
VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
 Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) {
     RETURN_IF_ERROR(_write_footer());
     // finish
-    RETURN_IF_ERROR(_file_writer->finalize());
+    RETURN_IF_ERROR(_file_writer->close(true));
     *segment_file_size = _file_writer->bytes_appended();
     if (*segment_file_size == 0) {
         return Status::Corruption("Bad segment, file size = 0");
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 860886bc9b7..2907c4b7a9b 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -434,6 +434,7 @@ private:
     std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool;
+
     std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
 
diff --git a/be/src/pipeline/pipeline_tracing.cpp 
b/be/src/pipeline/pipeline_tracing.cpp
index 047e3c3a01d..d9029951682 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -116,7 +116,6 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id) 
{
         THROW_IF_ERROR(writer.appendv(&text, 1));
     }
 
-    THROW_IF_ERROR(writer.finalize());
     THROW_IF_ERROR(writer.close());
 
     _last_dump_time = MonotonicSeconds();
@@ -156,7 +155,6 @@ void PipelineTracerContext::_dump_timeslice() {
             THROW_IF_ERROR(writer.appendv(&text, 1));
         }
     }
-    THROW_IF_ERROR(writer.finalize());
     THROW_IF_ERROR(writer.close());
 
     _last_dump_time = MonotonicSeconds();
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0f6aa71c974..24877103384 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -197,6 +197,7 @@ public:
     ThreadPool* send_report_thread_pool() { return 
_send_report_thread_pool.get(); }
     ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); 
}
     ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); 
}
+    ThreadPool* non_block_close_thread_pool();
 
     Status init_pipeline_task_scheduler();
     void init_file_cache_factory();
@@ -368,6 +369,7 @@ private:
     std::unique_ptr<ThreadPool> _join_node_thread_pool;
     // Pool to use a new thread to release object
     std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
+    std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
 
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index ba0473ee10c..8d654c8d09b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -111,6 +111,23 @@
 #include "runtime/memory/tcmalloc_hook.h"
 #endif
 
+// Used for unit test
+namespace {
+std::once_flag flag;
+std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
+void init_threadpool_for_test() {
+    static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
+                              .set_min_threads(12)
+                              .set_max_threads(48)
+                              .build(&non_block_close_thread_pool));
+}
+
+[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() {
+    std::call_once(flag, init_threadpool_for_test);
+    return non_block_close_thread_pool.get();
+}
+} // namespace
+
 namespace doris {
 class PBackendService_Stub;
 class PFunctionService_Stub;
@@ -153,6 +170,14 @@ static pair<size_t, size_t> get_num_threads(size_t 
min_num, size_t max_num) {
     return {min_num, max_num};
 }
 
+ThreadPool* ExecEnv::non_block_close_thread_pool() {
+#ifdef BE_TEST
+    return get_non_block_close_thread_pool();
+#else
+    return _non_block_close_thread_pool.get();
+#endif
+}
+
 Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
                      const std::vector<StorePath>& spill_store_paths,
                      const std::set<std::string>& broken_paths) {
@@ -235,6 +260,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_threads(1)
                               .set_max_queue_size(1000000)
                               .build(&_lazy_release_obj_pool));
+    static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool")
+                              
.set_min_threads(config::min_nonblock_close_thread_num)
+                              
.set_max_threads(config::max_nonblock_close_thread_num)
+                              .build(&_non_block_close_thread_pool));
 
     // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
     // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
@@ -639,6 +668,7 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
     SAFE_SHUTDOWN(_join_node_thread_pool);
     SAFE_SHUTDOWN(_lazy_release_obj_pool);
+    SAFE_SHUTDOWN(_non_block_close_thread_pool);
     SAFE_SHUTDOWN(_send_report_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
 
@@ -684,6 +714,7 @@ void ExecEnv::destroy() {
     // TODO(zhiqiang): Maybe we should call shutdown before release thread 
pool?
     _join_node_thread_pool.reset(nullptr);
     _lazy_release_obj_pool.reset(nullptr);
+    _non_block_close_thread_pool.reset(nullptr);
     _send_report_thread_pool.reset(nullptr);
     _send_table_stats_thread_pool.reset(nullptr);
     _buffered_reader_prefetch_thread_pool.reset(nullptr);
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 535fbf772c9..32503bbfdb7 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -176,7 +176,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const 
SegmentStatistics& st
     if (file_writer == nullptr) {
         return Status::Corruption("add_segment failed, file writer {} is 
destoryed", segid);
     }
-    if (!file_writer->closed()) {
+    if (file_writer->closed() != io::FileWriterState::CLOSED) {
         return Status::Corruption("add_segment failed, segment {} is not 
closed",
                                   file_writer->path().native());
     }
@@ -209,7 +209,7 @@ Status LoadStreamWriter::close() {
     }
 
     for (const auto& writer : _segment_file_writers) {
-        if (!writer->closed()) {
+        if (writer->closed() != io::FileWriterState::CLOSED) {
             return Status::Corruption("LoadStreamWriter close failed, segment 
{} is not closed",
                                       writer->path().native());
         }
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 362df33aa28..919680f1c16 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -136,7 +136,6 @@ public:
             }
             std::string data(1, '0');
             ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok());
-            ASSERT_TRUE(writer->finalize().ok());
             ASSERT_TRUE(writer->close().ok());
         }
         ExecEnv::GetInstance()->_file_cache_factory = factory.get();
diff --git a/be/test/io/fs/hdfs_file_system_test.cpp 
b/be/test/io/fs/hdfs_file_system_test.cpp
index 87fe5d52dd8..41c26ccedf7 100644
--- a/be/test/io/fs/hdfs_file_system_test.cpp
+++ b/be/test/io/fs/hdfs_file_system_test.cpp
@@ -110,9 +110,6 @@ TEST(HdfsFileSystemTest, Write) {
     st = hdfs_file_writer->append(content_2M);
     ASSERT_TRUE(st.ok()) << st;
 
-    st = hdfs_file_writer->finalize();
-    ASSERT_TRUE(st.ok()) << st;
-
     st = hdfs_file_writer->close();
     ASSERT_TRUE(st.ok()) << st;
 
diff --git a/be/test/io/fs/local_file_system_test.cpp 
b/be/test/io/fs/local_file_system_test.cpp
index 556fb1adecc..a264ca799dd 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -91,8 +91,6 @@ TEST_F(LocalFileSystemTest, WriteRead) {
     Slice slices[2] {abc, bcd};
     st = file_writer->appendv(slices, 2);
     ASSERT_TRUE(st.ok()) << st;
-    st = file_writer->finalize();
-    ASSERT_TRUE(st.ok()) << st;
     st = file_writer->close();
     ASSERT_TRUE(st.ok()) << st;
     ASSERT_EQ(file_writer->bytes_appended(), 115);
@@ -143,8 +141,6 @@ TEST_F(LocalFileSystemTest, Exist) {
     io::FileWriterPtr file_writer;
     auto st = io::global_local_filesystem()->create_file(fname, &file_writer);
     ASSERT_TRUE(st.ok()) << st;
-    st = file_writer->finalize();
-    ASSERT_TRUE(st.ok()) << st;
     st = file_writer->close();
     ASSERT_TRUE(st.ok()) << st;
     ASSERT_TRUE(check_exist(fname));
@@ -155,8 +151,6 @@ TEST_F(LocalFileSystemTest, List) {
     auto fname = fmt::format("{}/abc", test_dir);
     auto st = io::global_local_filesystem()->create_file(fname, &file_writer);
     ASSERT_TRUE(st.ok()) << st;
-    st = file_writer->finalize();
-    ASSERT_TRUE(st.ok()) << st;
     st = file_writer->close();
     ASSERT_TRUE(st.ok()) << st;
     ASSERT_TRUE(check_exist(fname));
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index 7d7d71ab318..9e0923d7d53 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -123,7 +123,6 @@ TEST_F(S3FileWriterTest, multi_part_io_error) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(!s3_file_writer->finalize().ok());
         // The second part would fail uploading itself to s3
         // so the result of close should be not ok
         ASSERT_TRUE(!s3_file_writer->close().ok());
@@ -166,7 +165,6 @@ TEST_F(S3FileWriterTest, put_object_io_error) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(!s3_file_writer->finalize().ok());
         // The object might be timeout but still succeed in loading
         ASSERT_TRUE(!s3_file_writer->close().ok());
     }
@@ -266,7 +264,6 @@ TEST_F(S3FileWriterTest, normal) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(s3_file_writer->finalize().ok());
         ASSERT_EQ(Status::OK(), s3_file_writer->close());
         int64_t s3_file_size = 0;
         ASSERT_EQ(Status::OK(), s3_fs->file_size("normal", &s3_file_size));
@@ -299,7 +296,6 @@ TEST_F(S3FileWriterTest, smallFile) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(s3_file_writer->finalize().ok());
         ASSERT_EQ(Status::OK(), s3_file_writer->close());
         int64_t s3_file_size = 0;
         ASSERT_EQ(Status::OK(), s3_fs->file_size("small", &s3_file_size));
@@ -359,7 +355,6 @@ TEST_F(S3FileWriterTest, finalize_error) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(!s3_file_writer->finalize().ok());
         bool exits = false;
         static_cast<void>(s3_fs->exists("finalize_error", &exits));
         ASSERT_TRUE(!exits);
@@ -396,7 +391,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(s3_file_writer->finalize().ok());
         // The second part would fail uploading itself to s3
         // so the result of close should be not ok
         auto st = s3_file_writer->close();
@@ -435,7 +429,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(s3_file_writer->finalize().ok());
         // The second part would fail uploading itself to s3
         // so the result of close should be not ok
         auto st = s3_file_writer->close();
@@ -474,7 +467,6 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
             offset += bytes_read;
         }
         ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
-        ASSERT_TRUE(s3_file_writer->finalize().ok());
         // The second part would fail uploading itself to s3
         // so the result of close should be not ok
         auto st = s3_file_writer->close();
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index ad6e496c56f..b9b0e0818cf 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -106,7 +106,7 @@ TEST_F(StreamSinkFileWriterTest, Test) {
 
     CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size()));
     EXPECT_EQ(NUM_STREAM, g_num_request);
-    CHECK_STATUS_OK(writer.finalize());
+    CHECK_STATUS_OK(writer.close());
     EXPECT_EQ(NUM_STREAM * 2, g_num_request);
 }
 
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 49de1826104..953be73862d 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -99,15 +99,13 @@ public:
 
     ~FileWriterMock() override = default;
 
-    Status close() override { return _local_file_writer->close(); }
+    Status close(bool /*non_block*/) override { return 
_local_file_writer->close(); }
 
     Status appendv(const Slice* data, size_t data_cnt) override {
         return _local_file_writer->appendv(data, data_cnt);
     }
 
-    Status finalize() override { return _local_file_writer->finalize(); }
-
-    bool closed() const override { return _local_file_writer->closed(); }
+    io::FileWriterState closed() const override { return 
_local_file_writer->closed(); }
 
     size_t bytes_appended() const override { return 
_local_file_writer->bytes_appended(); }
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
index 759c409a850..4a7e3d45e90 100644
--- 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
@@ -103,13 +103,6 @@ suite("test_index_compound_directory_failure_injection", 
"nonConcurrent") {
             
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
         }
         qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
-        try {
-            
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
-            load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
-        } finally {
-            
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
-        }
-        qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
         try {
             
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
             load_httplogs_data.call(testTable_dup, 
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to