This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b07b840 [fix](load) fix bug that BE may crash when calling
`mark_as_failed` (#8501)
b07b840 is described below
commit b07b840b765bf270452cdf7b983413fe544852e3
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Mar 18 09:38:16 2022 +0800
[fix](load) fix bug that BE may crash when calling `mark_as_failed` (#8501)
1.
The methods in the IndexChannel are called back in the RpcClosure in the
NodeChannel.
However, this callback may occur after the whole task is finished (e.g. due
to network latency),
and by that time the IndexChannel may have been destructured, so we should
not call
the IndexChannel methods anymore, otherwise the BE will crash.
Therefore, we use the `_is_closed` variable and `_closed_lock` to ensure
that the RPC callback function
will not call the IndexChannel's method after the NodeChannel is closed.
2.
Do not add IndexChannel to the ObjectPool.
Because when deconstruct IndexChannel, it may call the deconstruction of
NodeChannel.
And the deconstruction of NodeChannel maybe time consuming(wait rpc
finished).
But the ObjectPool will hold a SpinLock to destroy the objects, so it may
cause CPU busy.
---
be/src/exec/tablet_sink.cpp | 32 +++++++++++++++++++++++---------
be/src/exec/tablet_sink.h | 15 +++++++++++++--
2 files changed, 36 insertions(+), 11 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index c4faf12..be547cf 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -34,6 +34,7 @@
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
#include "util/debug/sanitizer_scopes.h"
+#include "util/defer_op.h"
#include "util/monotime.h"
#include "util/proto_util.h"
#include "util/threadpool.h"
@@ -183,6 +184,12 @@ Status NodeChannel::open_wait() {
// add batch closure
_add_batch_closure =
ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
+ std::lock_guard<std::mutex> l(this->_closed_lock);
+ if (this->_is_closed) {
+ // if the node channel is closed, no need to call `mark_as_failed`,
+ // and notice that _index_channel may already be destroyed.
+ return;
+ }
// If rpc failed, mark all tablets on this node channel as failed
_index_channel->mark_as_failed(this->node_id(), this->host(),
_add_batch_closure->cntl.ErrorText(), -1);
Status st = _index_channel->check_intolerable_failure();
@@ -197,6 +204,12 @@ Status NodeChannel::open_wait() {
_add_batch_closure->addSuccessHandler([this](const
PTabletWriterAddBatchResult& result,
bool is_last_rpc) {
+ std::lock_guard<std::mutex> l(this->_closed_lock);
+ if (this->_is_closed) {
+ // if the node channel is closed, no need to call the following
logic,
+ // and notice that _index_channel may already be destroyed.
+ return;
+ }
Status status(result.status());
if (status.ok()) {
// if has error tablet, handle them first
@@ -333,15 +346,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
return Status::OK();
}
-Status NodeChannel::mark_close() {
+void NodeChannel::mark_close() {
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
- if (_cancelled) {
- std::lock_guard<SpinLock> l(_cancel_msg_lock);
- return Status::InternalError("mark close failed. " + _cancel_msg);
- } else {
- return st.clone_and_prepend("already stopped, can't mark as
closed. cancelled/eos: ");
- }
+ return;
}
_cur_add_batch_request.set_eos(true);
@@ -358,10 +366,16 @@ Status NodeChannel::mark_close() {
}
_eos_is_produced = true;
- return Status::OK();
+ return;
}
Status NodeChannel::close_wait(RuntimeState* state) {
+ // set _is_closed to true finally
+ Defer set_closed {[&]() {
+ std::lock_guard<std::mutex> l(_closed_lock);
+ _is_closed = true;
+ }};
+
auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
if (_cancelled) {
@@ -812,7 +826,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
tablets.emplace_back(std::move(tablet_with_partition));
}
}
- auto channel = _pool->add(new IndexChannel(this, index->index_id));
+ auto channel = std::make_shared<IndexChannel>(this, index->index_id);
RETURN_IF_ERROR(channel->init(state, tablets));
_channels.emplace_back(channel);
}
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 5c0330c..b5c10b7 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -170,7 +170,7 @@ public:
// two ways to stop channel:
// 1. mark_close()->close_wait() PS. close_wait() will block waiting for
the last AddBatch rpc response.
// 2. just cancel()
- Status mark_close();
+ void mark_close();
Status close_wait(RuntimeState* state);
void cancel(const std::string& cancel_msg);
@@ -284,6 +284,17 @@ private:
// the timestamp when this node channel be marked closed and finished
closed
uint64_t _close_time_ms = 0;
+
+ // lock to protect _is_closed.
+ // The methods in the IndexChannel are called back in the RpcClosure in
the NodeChannel.
+ // However, this rpc callback may occur after the whole task is finished
(e.g. due to network latency),
+ // and by that time the IndexChannel may have been destructured, so we
should not call the
+ // IndexChannel methods anymore, otherwise the BE will crash.
+ // Therefore, we use the _is_closed and _closed_lock to ensure that the
RPC callback
+ // function will not call the IndexChannel method after the NodeChannel is
closed.
+ // The IndexChannel is definitely accessible until the NodeChannel is
closed.
+ std::mutex _closed_lock;
+ bool _is_closed = false;
};
class IndexChannel {
@@ -425,7 +436,7 @@ protected:
Bitmap _filter_bitmap;
// index_channel
- std::vector<IndexChannel*> _channels;
+ std::vector<std::shared_ptr<IndexChannel>> _channels;
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _sender_thread;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]