This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ead1da60239 branch-4.0: [fix](be) Poll packed file async close without 
blocking #62938 (#63531)
ead1da60239 is described below

commit ead1da60239c63611f5dfc836530f8815e3db36b
Author: Yixuan Wang <[email protected]>
AuthorDate: Thu May 28 14:50:24 2026 +0800

    branch-4.0: [fix](be) Poll packed file async close without blocking #62938 
(#63531)
    
    pick: https://github.com/apache/doris/pull/62938
---
 be/src/io/fs/file_writer.h                         | 10 +++
 be/src/io/fs/packed_file_manager.cpp               | 14 +---
 be/src/io/fs/s3_file_writer.cpp                    | 91 +++++++++++-----------
 be/src/io/fs/s3_file_writer.h                      |  2 +
 be/src/olap/rowset/beta_rowset_writer.cpp          | 12 +++
 be/src/olap/rowset/segment_creator.cpp             |  7 ++
 be/test/io/fs/packed_file_manager_test.cpp         | 72 ++++++++++++++---
 .../test_packed_file_async_close_error.groovy      | 78 +++++++++++++++++++
 8 files changed, 222 insertions(+), 64 deletions(-)

diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 0cda2b519c4..de4fc6f577a 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -72,6 +72,16 @@ public:
     // If there is no data appended, an empty file will be persisted.
     virtual Status close(bool non_block = false) = 0;
 
+    // Non-blocking probe for a previous close(true).
+    // OK means close finished successfully. NeedSendAgain means close is 
still running.
+    // Other errors mean close finished with error or the writer does not 
support this API.
+    // NOTE: This method consumes the async close result when it is ready. The 
caller must
+    // use it as the only completion path for that async close; mixing it with 
close(false)
+    // or another try_finish_close consumer is not supported.
+    virtual Status try_finish_close() {
+        return Status::NotSupported("try_finish_close is not supported");
+    }
+
     Status append(const Slice& data) { return appendv(&data, 1); }
 
     virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
diff --git a/be/src/io/fs/packed_file_manager.cpp 
b/be/src/io/fs/packed_file_manager.cpp
index 6bff9eff6f3..654f2d933fd 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -851,11 +851,6 @@ void PackedFileManager::process_uploading_packed_files() {
         Status upload_status = 
finalize_packed_file_upload(packed_file->packed_file_path,
                                                            
packed_file->writer.get());
 
-        if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
-            record_ready_to_upload(packed_file);
-            handle_success(packed_file);
-            continue;
-        }
         if (!upload_status.ok()) {
             handle_failure(packed_file, upload_status);
             continue;
@@ -873,16 +868,13 @@ void PackedFileManager::process_uploading_packed_files() {
             continue;
         }
 
-        if (packed_file->writer->state() != FileWriter::State::CLOSED) {
+        Status status = packed_file->writer->try_finish_close();
+        if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
             continue;
         }
 
-        Status status = packed_file->writer->close(true);
-        if (status.is<ErrorCode::ALREADY_CLOSED>()) {
-            handle_success(packed_file);
-            continue;
-        }
         if (status.ok()) {
+            handle_success(packed_file);
             continue;
         }
 
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index a1b72b8dc98..eec1e4c3a60 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -41,6 +41,7 @@
 #include "io/fs/s3_file_system.h"
 #include "io/fs/s3_obj_storage_client.h"
 #include "runtime/exec_env.h"
+#include "util/debug_points.h"
 #include "util/s3_util.h"
 #include "util/stopwatch.hpp"
 
@@ -126,48 +127,10 @@ void S3FileWriter::_wait_until_finish(std::string_view 
task_name) {
 }
 
 Status S3FileWriter::close(bool non_block) {
-    auto record_close_latency = [this]() {
-        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
-            return;
-        }
-        auto now = std::chrono::steady_clock::now();
-        auto latency_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(
-                                  now - *_first_append_timestamp)
-                                  .count();
-        s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
-        if (auto* sampler = 
s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
-            sampler->take_sample();
-        }
-        _close_latency_recorded = true;
-    };
-
     if (state() == State::CLOSED) {
-        if (_async_close_pack != nullptr) {
-            _st = _async_close_pack->future.get();
-            _async_close_pack = nullptr;
-            // Return the final close status so that a blocking close issued 
after
-            // an async close observes the real result just like the legacy 
behavior.
-            if (!non_block && _st.ok()) {
-                record_close_latency();
-            }
-            return _st;
-        }
-        if (non_block) {
-            if (_st.ok()) {
-                record_close_latency();
-                return Status::Error<ErrorCode::ALREADY_CLOSED>(
-                        "S3FileWriter already closed, file path {}, file key 
{}",
-                        _obj_storage_path_opts.path.native(), 
_obj_storage_path_opts.key);
-            }
-            return _st;
-        }
-        if (_st.ok()) {
-            record_close_latency();
-            return Status::Error<ErrorCode::ALREADY_CLOSED>(
-                    "S3FileWriter already closed, file path {}, file key {}",
-                    _obj_storage_path_opts.path.native(), 
_obj_storage_path_opts.key);
-        }
-        return _st;
+        return Status::InternalError("S3FileWriter already closed, file path 
{}, file key {}",
+                                     _obj_storage_path_opts.path.native(),
+                                     _obj_storage_path_opts.key);
     }
     if (state() == State::ASYNC_CLOSING) {
         if (non_block) {
@@ -181,7 +144,7 @@ Status S3FileWriter::close(bool non_block) {
         // The next time we call close() with no matter non_block true or 
false, it would always return the
         // '_st' value because this writer is already closed.
         if (!non_block && _st.ok()) {
-            record_close_latency();
+            _record_close_latency();
         }
         return _st;
     }
@@ -194,7 +157,6 @@ Status S3FileWriter::close(bool non_block) {
             s3_file_writer_async_close_queuing << -1;
             s3_file_writer_async_close_processing << 1;
             _st = _close_impl();
-            _state = State::CLOSED;
             _async_close_pack->promise.set_value(_st);
             s3_file_writer_async_close_processing << -1;
         });
@@ -202,7 +164,42 @@ Status S3FileWriter::close(bool non_block) {
     _st = _close_impl();
     _state = State::CLOSED;
     if (!non_block && _st.ok()) {
-        record_close_latency();
+        _record_close_latency();
+    }
+    return _st;
+}
+
+void S3FileWriter::_record_close_latency() {
+    if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
+        return;
+    }
+    auto now = std::chrono::steady_clock::now();
+    auto latency_ms =
+            std::chrono::duration_cast<std::chrono::milliseconds>(now - 
*_first_append_timestamp)
+                    .count();
+    s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
+    if (auto* sampler = 
s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
+        sampler->take_sample();
+    }
+    _close_latency_recorded = true;
+}
+
+Status S3FileWriter::try_finish_close() {
+    if (state() == State::CLOSED) {
+        return _st;
+    }
+    if (state() != State::ASYNC_CLOSING) {
+        return Status::NotSupported("S3FileWriter is not async closing");
+    }
+    CHECK(_async_close_pack != nullptr);
+    if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready) {
+        return Status::NeedSendAgain("async close is not finished");
+    }
+    _st = _async_close_pack->future.get();
+    _async_close_pack = nullptr;
+    _state = State::CLOSED;
+    if (_st.ok()) {
+        _record_close_latency();
     }
     return _st;
 }
@@ -254,6 +251,12 @@ Status S3FileWriter::_build_upload_buffer() {
 Status S3FileWriter::_close_impl() {
     VLOG_DEBUG << "S3FileWriter::close, path: " << 
_obj_storage_path_opts.path.native();
 
+    DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", {
+        if (_obj_storage_path_opts.key.ends_with(".dat")) {
+            return Status::IOError("S3FileWriter._close_impl.inject_error");
+        }
+    });
+
     if (_cur_part_num == 1 && _pending_buf) { // data size is less than 
config::s3_write_buffer_size
         RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
     }
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 38e68b14c4d..f31a9edef2c 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -71,6 +71,7 @@ public:
     }
 
     Status close(bool non_block = false) override;
+    Status try_finish_close() override;
 
 private:
     Status _close_impl();
@@ -84,6 +85,7 @@ private:
     void _upload_one_part(int part_num, UploadFileBuffer& buf);
     bool _complete_part_task_callback(Status s);
     Status _build_upload_buffer();
+    void _record_close_latency();
 
     ObjectStoragePathOptions _obj_storage_path_opts;
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8e0794ce5d3..16f1419dba6 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -23,11 +23,13 @@
 #include <fmt/format.h>
 #include <stdio.h>
 
+#include <chrono>
 #include <ctime> // time
 #include <filesystem>
 #include <memory>
 #include <mutex>
 #include <sstream>
+#include <thread>
 #include <utility>
 #include <vector>
 
@@ -139,6 +141,16 @@ Status SegmentFileCollection::close() {
     }
 
     for (auto&& [_, writer] : _file_writers) {
+        DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", {
+            auto before_state = writer->state();
+            for (int i = 0; i < 3000 && writer->state() != 
io::FileWriter::State::CLOSED; ++i) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+            LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path="
+                      << writer->path().native()
+                      << " before_state=" << static_cast<int>(before_state)
+                      << " after_state=" << static_cast<int>(writer->state());
+        });
         if (writer->state() != io::FileWriter::State::CLOSED) {
             RETURN_IF_ERROR(writer->close());
         }
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index b35d4764d2f..b2982eabcf9 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -20,6 +20,7 @@
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
 
+#include <chrono>
 #include <filesystem>
 #include <memory>
 #include <sstream>
@@ -236,6 +237,9 @@ Status SegmentFlusher::_flush_segment_writer(
         return Status::Error(s.code(), "failed to finalize segment: {}", 
s.to_string());
     }
 
+    
DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
+                    { 
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
+
     MonotonicStopWatch inverted_index_timer;
     inverted_index_timer.start();
     int64_t inverted_index_file_size = 0;
@@ -311,6 +315,9 @@ Status 
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
         return Status::Error(s.code(), "failed to finalize segment: {}", 
s.to_string());
     }
 
+    
DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
+                    { 
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
+
     MonotonicStopWatch inverted_index_timer;
     inverted_index_timer.start();
     int64_t inverted_index_file_size = 0;
diff --git a/be/test/io/fs/packed_file_manager_test.cpp 
b/be/test/io/fs/packed_file_manager_test.cpp
index 645f0896f1a..37800bf1620 100644
--- a/be/test/io/fs/packed_file_manager_test.cpp
+++ b/be/test/io/fs/packed_file_manager_test.cpp
@@ -55,24 +55,20 @@ public:
     void set_start_close_status(Status st) { _start_close_status = 
std::move(st); }
     void complete_async_close() {
         if (_state == State::ASYNC_CLOSING) {
-            _state = State::CLOSED;
+            _async_close_ready = true;
         }
     }
 
+    size_t close_calls() const { return _close_calls; }
+    size_t try_finish_close_calls() const { return _try_finish_close_calls; }
     size_t append_calls() const { return _append_calls; }
     bool closed() const { return _state == State::CLOSED; }
     size_t bytes_appended() const override { return _bytes_appended; }
     const std::string& written_data() const { return _written; }
 
     Status close(bool non_block = false) override {
+        ++_close_calls;
         if (_state == State::CLOSED) {
-            if (non_block) {
-                if (_close_status.ok()) {
-                    return Status::Error<ErrorCode::ALREADY_CLOSED>(
-                            "MockFileWriter already closed: {}", 
_path.native());
-                }
-                return _close_status;
-            }
             return Status::Error<ErrorCode::ALREADY_CLOSED>("MockFileWriter 
already closed: {}",
                                                             _path.native());
         }
@@ -82,11 +78,18 @@ public:
         }
 
         if (_state == State::ASYNC_CLOSING) {
-            return Status::InternalError("Don't submit async close multi 
times");
+            if (non_block) {
+                return Status::InternalError("Don't submit async close multi 
times");
+            }
+            _async_close_ready = true;
+            _async_close_consumed = true;
+            _state = State::CLOSED;
+            return _close_status;
         }
 
         if (non_block) {
             _state = State::ASYNC_CLOSING;
+            _async_close_ready = false;
             return Status::OK();
         }
 
@@ -94,6 +97,19 @@ public:
         return _close_status;
     }
 
+    Status try_finish_close() override {
+        ++_try_finish_close_calls;
+        if (_state == State::CLOSED) {
+            return _async_close_consumed ? _close_status : Status::OK();
+        }
+        if (_state != State::ASYNC_CLOSING || !_async_close_ready) {
+            return Status::NeedSendAgain("async close is not finished");
+        }
+        _state = State::CLOSED;
+        _async_close_consumed = true;
+        return _close_status;
+    }
+
     Status appendv(const Slice* data, size_t data_cnt) override {
         if (!_append_status.ok()) {
             return _append_status;
@@ -114,10 +130,14 @@ private:
     Path _path;
     size_t _bytes_appended = 0;
     size_t _append_calls = 0;
+    size_t _close_calls = 0;
+    size_t _try_finish_close_calls = 0;
     std::string _written;
     Status _start_close_status = Status::OK();
     Status _append_status = Status::OK();
     Status _close_status = Status::OK();
+    bool _async_close_ready = false;
+    bool _async_close_consumed = false;
     State _state = State::OPENED;
 };
 
@@ -581,6 +601,40 @@ TEST_F(PackedFileManagerTest, 
ProcessUploadingFilesSetsFailedWhenAsyncCloseFails
     EXPECT_NE(failed->last_error.find("async close fail"), std::string::npos);
 }
 
+TEST_F(PackedFileManagerTest, 
ProcessUploadingFilesPollsAsyncCloseWithoutBlocking) {
+    std::string payload = "abc";
+    Slice slice(payload);
+    auto info = default_append_info();
+    ASSERT_TRUE(manager->append_small_file("async_poll_fail", slice, 
info).ok());
+    
ASSERT_TRUE(manager->mark_current_packed_file_for_upload(_resource_id).ok());
+    ASSERT_EQ(manager->uploading_packed_files_for_test().size(), 1);
+
+    auto uploading = 
manager->uploading_packed_files_for_test().begin()->second;
+    auto* writer = dynamic_cast<MockFileWriter*>(uploading->writer.get());
+    ASSERT_NE(writer, nullptr);
+    uploading->state = PackedFileManager::PackedFileState::UPLOADING;
+    writer->set_close_status(Status::IOError("async close poll fail"));
+    ASSERT_TRUE(writer->close(true).ok());
+    ASSERT_EQ(writer->close_calls(), 1);
+
+    manager->process_uploading_packed_files();
+    EXPECT_EQ(uploading->state.load(), 
PackedFileManager::PackedFileState::UPLOADING);
+    EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 1);
+    EXPECT_EQ(manager->uploaded_packed_files_for_test().size(), 0);
+    EXPECT_EQ(writer->close_calls(), 1);
+    EXPECT_EQ(writer->try_finish_close_calls(), 1);
+
+    writer->complete_async_close();
+    manager->process_uploading_packed_files();
+    EXPECT_EQ(writer->close_calls(), 1);
+    EXPECT_EQ(writer->try_finish_close_calls(), 2);
+    EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 0);
+    ASSERT_EQ(manager->uploaded_packed_files_for_test().size(), 1);
+    auto failed = manager->uploaded_packed_files_for_test().begin()->second;
+    EXPECT_EQ(failed->state.load(), 
PackedFileManager::PackedFileState::FAILED);
+    EXPECT_NE(failed->last_error.find("async close poll fail"), 
std::string::npos);
+}
+
 TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) {
     std::string payload = "abc";
     Slice slice(payload);
diff --git 
a/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
 
b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
new file mode 100644
index 00000000000..7407e5eb27d
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_packed_file_async_close_error", "p0, nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def closeErrorPoint = "S3FileWriter._close_impl.inject_error"
+    def afterFinalizeSleepPoint = 
"SegmentFlusher._flush_segment_writer.after_finalize.sleep"
+    def waitDatClosedPoint = "SegmentFileCollection.close.wait_dat_closed"
+
+    sql """ DROP TABLE IF EXISTS test_packed_file_async_close_error """
+    sql """
+        CREATE TABLE IF NOT EXISTS test_packed_file_async_close_error (
+            `k1` int NULL,
+            `v1` varchar(32) NULL,
+            INDEX idx_v1 (`v1`) USING INVERTED PROPERTIES("parser" = "english")
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+
+    setBeConfigTemporary([
+        "enable_packed_file": "false",
+        "enable_vertical_segment_writer": "true",
+        "small_file_threshold_bytes": "1048576",
+        "packed_file_size_threshold_bytes": "1048576",
+        "packed_file_time_threshold_ms": "1"
+    ]) {
+        try {
+            sql """ SET enable_file_cache = false """
+            GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint)
+            GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint)
+            GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint)
+            GetDebugPoint().enableDebugPointForAllBEs(closeErrorPoint)
+            GetDebugPoint().enableDebugPointForAllBEs(afterFinalizeSleepPoint)
+            GetDebugPoint().enableDebugPointForAllBEs(waitDatClosedPoint)
+
+            streamLoad {
+                table "test_packed_file_async_close_error"
+                set "column_separator", ","
+                inputText "1,a\n2,b\n"
+                time 120000
+
+                check { result, exception, startTime, endTime ->
+                    def msg = exception == null ? result : 
exception.getMessage()
+                    logger.info("stream load result with injected S3 close 
error: ${msg}")
+                    assertTrue(exception == null, "stream load should succeed 
before async packed file upload fails: ${msg}")
+                    def json = parseJson(result)
+                    assertEquals("fail", json.Status.toLowerCase())
+                }
+            }
+
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint)
+            GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint)
+            GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to