This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a277a6f11d [fix](move-memtable) don't abort in replica write layer
unless all replica fails (#29257)
9a277a6f11d is described below
commit 9a277a6f11daeb612012babd95fbc0aa73ee5720
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Dec 29 00:03:28 2023 +0800
[fix](move-memtable) don't abort in replica write layer unless all replica
fails (#29257)
---
be/src/io/fs/stream_sink_file_writer.cpp | 33 ++++++++++++++++++++++++++++----
1 file changed, 29 insertions(+), 4 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 25a4f5b27d5..2fce7c6baa9 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,9 +51,22 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
<< ", data_length: " << bytes_req;
std::span<const Slice> slices {data, data_cnt};
+ bool ok = false;
for (auto& stream : _streams) {
- RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id,
_tablet_id, _segment_id,
- _bytes_appended, slices));
+ auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
+ _bytes_appended, slices);
+ ok = ok || st.ok();
+ }
+ if (!ok) {
+ std::stringstream ss;
+ for (auto& stream : _streams) {
+ ss << " " << stream->dst_id();
+ }
+ LOG(WARNING) << "failed to write any replicas, load_id: " <<
print_id(_load_id)
+ << ", index_id: " << _index_id << ", tablet_id: " <<
_tablet_id
+ << ", segment_id: " << _segment_id << ", data_length: "
<< bytes_req
+ << ", backends:" << ss.str();
+ return Status::InternalError("failed to write any replicas");
}
_bytes_appended += bytes_req;
return Status::OK();
@@ -63,9 +76,21 @@ Status StreamSinkFileWriter::finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
+ bool ok = false;
for (auto& stream : _streams) {
- RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id,
_tablet_id, _segment_id,
- _bytes_appended, {}, true));
+ auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
+ _bytes_appended, {}, true);
+ ok = ok || st.ok();
+ }
+ if (!ok) {
+ std::stringstream ss;
+ for (auto& stream : _streams) {
+ ss << " " << stream->dst_id();
+ }
+ LOG(WARNING) << "failed to finalize any replicas, load_id: " <<
print_id(_load_id)
+ << ", index_id: " << _index_id << ", tablet_id: " <<
_tablet_id
+ << ", segment_id: " << _segment_id << ", backends:" <<
ss.str();
+ return Status::InternalError("failed to finalize any replicas");
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]