platoneko commented on code in PR #35307:
URL: https://github.com/apache/doris/pull/35307#discussion_r1618578476


##########
be/src/io/fs/s3_file_system.cpp:
##########
@@ -454,54 +345,74 @@ Status S3FileSystem::batch_upload_impl(const 
std::vector<Path>& local_files,
                                        local_files.size(), 
remote_files.size());
     }
 
-    Aws::Transfer::TransferManagerConfiguration 
transfer_config(default_executor().get());
-    transfer_config.s3Client = client;
-    auto transfer_manager = 
Aws::Transfer::TransferManager::Create(transfer_config);
+    std::vector<FileWriterPtr> obj_writers(local_files.size());
+
+    auto upload_task = [this](Path local_file, Path remote_file, 
FileWriterPtr* obj_writer) {
+        auto key = DORIS_TRY(get_key(remote_file));
+        LOG(INFO) << "Start to upload " << local_file.native() << " to " << 
full_s3_path(key);
+        RETURN_IF_ERROR(create_file_impl(key, obj_writer, nullptr));
+        FileReaderSPtr local_reader;
+        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, 
&local_reader));
+        size_t local_buffer_size = 
config::s3_file_system_local_upload_buffer_size;
+        std::unique_ptr<char[]> write_buffer = 
std::make_unique<char[]>(local_buffer_size);
+        size_t cur_read = 0;
+        while (cur_read < local_reader->size()) {
+            size_t bytes_read = 0;
+            RETURN_IF_ERROR(local_reader->read_at(
+                    cur_read, Slice {write_buffer.get(), local_buffer_size}, 
&bytes_read));
+            RETURN_IF_ERROR((*obj_writer)->append({write_buffer.get(), 
bytes_read}));
+            cur_read += bytes_read;
+        }
+        RETURN_IF_ERROR((*obj_writer)->close());
+        return Status::OK();
+    };
 
-    std::vector<std::shared_ptr<Aws::Transfer::TransferHandle>> handles;
+    std::vector<std::future<Status>> futures;
     for (int i = 0; i < local_files.size(); ++i) {
-        auto key = DORIS_TRY(get_key(remote_files[i]));
-        LOG(INFO) << "Start to upload " << local_files[i].native() << " to " 
<< full_s3_path(key);
-        auto handle =
-                transfer_manager->UploadFile(local_files[i].native(), _bucket, 
key, "text/plain",
-                                             Aws::Map<Aws::String, 
Aws::String>());
-        handles.push_back(std::move(handle));
+        std::shared_ptr<std::packaged_task<Status(Path local_file, Path 
remote_file,
+                                                  FileWriterPtr * obj_writer)>>
+                task = std::make_shared<std::packaged_task<Status(Path 
local_file, Path remote_file,
+                                                                  
FileWriterPtr * obj_writer)>>(
+                        upload_task);
+        futures.emplace_back(task->get_future());
+        default_executor()->Submit(
+                [t = std::move(task), local = local_files[i], remote = 
remote_files[i],
+                 obj_writer = &obj_writers[i]]() mutable { (*t)(local, remote, 
obj_writer); });
     }
-    for (auto& handle : handles) {
-        handle->WaitUntilFinished();
-        if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
-            // TODO(cyx): Maybe we can cancel remaining handles.
-            return s3fs_error(handle->GetLastError(), fmt::format("failed to 
upload to {}",
-                                                                  
full_s3_path(handle->GetKey())));
+    Status s = Status::OK();
+    for (auto&& f : futures) {
+        auto cur_s = f.get();
+        if (!cur_s.ok()) {
+            s = std::move(cur_s);
         }
     }
-    return Status::OK();
+    return s;
 }
 
 Status S3FileSystem::download_impl(const Path& remote_file, const Path& 
local_file) {
     auto client = _client->get();
     CHECK_S3_CLIENT(client);
     auto key = DORIS_TRY(get_key(remote_file));
-    Aws::S3::Model::GetObjectRequest request;
-    request.WithBucket(_bucket).WithKey(key);
-    Aws::S3::Model::GetObjectOutcome response;
-    {
-        SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
-        response = client->GetObject(request);
+    int64_t size;
+    RETURN_IF_ERROR(file_size(remote_file, &size));
+    std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
+    size_t bytes_read = 0;
+    // clang-format off
+    auto resp = client->get_object( {.bucket = _bucket, .key = key,},
+            buf.get(), 0, size, &bytes_read);
+    // clang-format on
+    if (!resp.status.ok()) {
+        return resp.status;
     }
-    if (response.IsSuccess()) {
-        Aws::OFStream local_file_s;
-        local_file_s.open(local_file, std::ios::out | std::ios::binary);
-        if (local_file_s.good()) {
-            local_file_s << response.GetResult().GetBody().rdbuf();
-        }
-        if (!local_file_s.good()) {
-            return Status::IOError("failed to download {}: failed to write 
file: {}",
-                                   remote_file.native(), local_file.native());
-        }
+    Aws::OFStream local_file_s;
+    local_file_s.open(local_file, std::ios::out | std::ios::binary);
+    if (local_file_s.good()) {
+        local_file_s << StringViewStream(buf.get(), size).rdbuf();
     } else {
-        return localfs_error(errno, fmt::format("failed to write file {}", 
local_file.native()));
+        return Status::IOError("failed to download {}: failed to write file: 
{}",

Review Comment:
   localfs_error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to