This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bfd23e30f62 [improve](load) handle EAGAIN in load stream (#29437)
bfd23e30f62 is described below
commit bfd23e30f628525a91f17dc54f605d16f5319e56
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jan 4 23:02:11 2024 +0800
[improve](load) handle EAGAIN in load stream (#29437)
---
be/src/runtime/load_stream.cpp | 35 +++++++++++++++++++++++++++--------
be/src/runtime/load_stream.h | 2 ++
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 682733e3bdd..a7b0f6ca8a4 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -411,10 +411,9 @@ void LoadStream::_report_result(StreamId stream, const
Status& status,
}
buf.append(response.SerializeAsString());
- int ret = brpc::StreamWrite(stream, buf);
- // TODO: handle EAGAIN
- if (ret != 0) {
- LOG(INFO) << "stream write report status " << ret << ": " <<
std::strerror(ret);
+ auto wst = _write_stream(stream, buf);
+ if (!wst.ok()) {
+ LOG(WARNING) << *this << " report result failed with " << wst;
}
}
@@ -437,13 +436,33 @@ void LoadStream::_report_schema(StreamId stream, const
PStreamHeader& hdr) {
st.to_protobuf(response.mutable_status());
buf.append(response.SerializeAsString());
- int ret = brpc::StreamWrite(stream, buf);
- // TODO: handle EAGAIN
- if (ret != 0) {
- LOG(INFO) << "stream write report schema " << ret << ": " <<
std::strerror(ret);
+ auto wst = _write_stream(stream, buf);
+ if (!wst.ok()) {
+ LOG(WARNING) << *this << " report result failed with " << wst;
}
}
+Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
+ for (;;) {
+ int ret = brpc::StreamWrite(stream, buf);
+ switch (ret) {
+ case 0:
+ return Status::OK();
+ case EAGAIN: {
+ const timespec time =
butil::seconds_from_now(config::load_stream_eagain_wait_seconds);
+ int wait_ret = brpc::StreamWait(stream, &time);
+ if (wait_ret != 0) {
+ return Status::InternalError("StreamWait failed, err={}",
wait_ret);
+ }
+ break;
+ }
+ default:
+ return Status::InternalError("StreamWrite failed, err={}", ret);
+ }
+ }
+ return Status::OK();
+}
+
void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader&
hdr) {
butil::IOBufAsZeroCopyInputStream wrapper(*message);
hdr.ParseFromZeroCopyStream(&wrapper);
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index ba6ad11c06a..6df9198504e 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -147,6 +147,8 @@ private:
_report_result(stream, status, {}, failed_tablets, false);
}
+ Status _write_stream(StreamId stream, butil::IOBuf& buf);
+
private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]