This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bfd23e30f62 [improve](load) handle EAGAIN in load stream (#29437)
bfd23e30f62 is described below

commit bfd23e30f628525a91f17dc54f605d16f5319e56
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jan 4 23:02:11 2024 +0800

    [improve](load) handle EAGAIN in load stream (#29437)
---
 be/src/runtime/load_stream.cpp | 35 +++++++++++++++++++++++++++--------
 be/src/runtime/load_stream.h   |  2 ++
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 682733e3bdd..a7b0f6ca8a4 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -411,10 +411,9 @@ void LoadStream::_report_result(StreamId stream, const 
Status& status,
     }
 
     buf.append(response.SerializeAsString());
-    int ret = brpc::StreamWrite(stream, buf);
-    // TODO: handle EAGAIN
-    if (ret != 0) {
-        LOG(INFO) << "stream write report status " << ret << ": " << 
std::strerror(ret);
+    auto wst = _write_stream(stream, buf);
+    if (!wst.ok()) {
+        LOG(WARNING) << *this << " report result failed with " << wst;
     }
 }
 
@@ -437,13 +436,33 @@ void LoadStream::_report_schema(StreamId stream, const 
PStreamHeader& hdr) {
     st.to_protobuf(response.mutable_status());
 
     buf.append(response.SerializeAsString());
-    int ret = brpc::StreamWrite(stream, buf);
-    // TODO: handle EAGAIN
-    if (ret != 0) {
-        LOG(INFO) << "stream write report schema " << ret << ": " << 
std::strerror(ret);
+    auto wst = _write_stream(stream, buf);
+    if (!wst.ok()) {
+        LOG(WARNING) << *this << " report result failed with " << wst;
     }
 }
 
+Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
+    for (;;) {
+        int ret = brpc::StreamWrite(stream, buf);
+        switch (ret) {
+        case 0:
+            return Status::OK();
+        case EAGAIN: {
+            const timespec time = 
butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
+            int wait_ret = brpc::StreamWait(stream, &time);
+            if (wait_ret != 0) {
+                return Status::InternalError("StreamWait failed, err={}", 
wait_ret);
+            }
+            break;
+        }
+        default:
+            return Status::InternalError("StreamWrite failed, err={}", ret);
+        }
+    }
+    return Status::OK();
+}
+
 void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& 
hdr) {
     butil::IOBufAsZeroCopyInputStream wrapper(*message);
     hdr.ParseFromZeroCopyStream(&wrapper);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index ba6ad11c06a..6df9198504e 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -147,6 +147,8 @@ private:
         _report_result(stream, status, {}, failed_tablets, false);
     }
 
+    Status _write_stream(StreamId stream, butil::IOBuf& buf);
+
 private:
     PUniqueId _load_id;
     std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to