This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3c12671d9f [fix](move-memtable) fix use-after-free in
LoadStreamReplyHandler (#29791)
e3c12671d9f is described below
commit e3c12671d9f1dd2a0a1b05529169e315e62dda15
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Jan 10 21:37:57 2024 +0800
[fix](move-memtable) fix use-after-free in LoadStreamReplyHandler (#29791)
---
be/src/vec/sink/load_stream_stub.cpp | 137 ++++++++++++--------
be/src/vec/sink/load_stream_stub.h | 139 ++++++++-------------
be/src/vec/sink/load_stream_stub_pool.cpp | 18 ---
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 10 +-
.../test_load_stream_stub_failure_injection.groovy | 6 +-
5 files changed, 143 insertions(+), 167 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index fdd9762330b..c84ef3b7a0a 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -28,30 +28,35 @@
namespace doris {
-int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
- butil::IOBuf*
const messages[],
- size_t size) {
+int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
butil::IOBuf* const messages[],
+ size_t size) {
+ auto stub = _stub.lock();
+ if (!stub) {
+ LOG(WARNING) << "stub is not exist when on_received_messages, " <<
*this
+ << ", stream_id=" << id;
+ return 0;
+ }
for (size_t i = 0; i < size; i++) {
butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
PLoadStreamResponse response;
response.ParseFromZeroCopyStream(&wrapper);
if (response.eos()) {
- _is_eos.store(true);
+ stub->_is_eos.store(true);
}
Status st = Status::create(response.status());
std::stringstream ss;
- ss << "on_received_messages, load_id=" << _load_id << ", backend_id="
<< _dst_id;
+ ss << "on_received_messages, " << *this << ", stream_id=" << id;
if (response.success_tablet_ids_size() > 0) {
ss << ", success tablet ids:";
for (auto tablet_id : response.success_tablet_ids()) {
ss << " " << tablet_id;
}
- std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+ std::lock_guard<bthread::Mutex> lock(stub->_success_tablets_mutex);
for (auto tablet_id : response.success_tablet_ids()) {
- _success_tablets.push_back(tablet_id);
+ stub->_success_tablets.push_back(tablet_id);
}
}
if (response.failed_tablets_size() > 0) {
@@ -60,11 +65,24 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
Status st = Status::create(pb.status());
ss << " " << pb.id() << ":" << st;
}
- std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+ std::lock_guard<bthread::Mutex> lock(stub->_failed_tablets_mutex);
for (auto pb : response.failed_tablets()) {
Status st = Status::create(pb.status());
- _failed_tablets.emplace(pb.id(), st);
+ stub->_failed_tablets.emplace(pb.id(), st);
+ }
+ }
+ if (response.tablet_schemas_size() > 0) {
+ ss << ", tablet schema num: " << response.tablet_schemas_size();
+ std::lock_guard<bthread::Mutex> lock(stub->_schema_mutex);
+ for (const auto& schema : response.tablet_schemas()) {
+ auto tablet_schema = std::make_unique<TabletSchema>();
+ tablet_schema->init_from_pb(schema.tablet_schema());
+ stub->_tablet_schema_for_index->emplace(schema.index_id(),
+
std::move(tablet_schema));
+ stub->_enable_unique_mow_for_index->emplace(
+ schema.index_id(),
schema.enable_unique_key_merge_on_write());
}
+ stub->_schema_cv.notify_all();
}
ss << ", status: " << st;
LOG(INFO) << ss.str();
@@ -83,21 +101,27 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
<< status;
}
}
-
- if (response.tablet_schemas_size() > 0) {
- std::vector<PTabletSchemaWithIndex>
schemas(response.tablet_schemas().begin(),
-
response.tablet_schemas().end());
- _stub->add_schema(schemas);
- }
}
return 0;
}
-void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
- LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
- std::lock_guard<bthread::Mutex> lock(_mutex);
- _is_closed.store(true);
- _close_cv.notify_all();
+void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+ Defer defer {[this]() { delete this; }};
+ LOG(INFO) << "on_closed, " << *this << ", stream_id=" << id;
+ auto stub = _stub.lock();
+ if (!stub) {
+ LOG(WARNING) << "stub is not exist when on_closed, " << *this;
+ return;
+ }
+ std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
+ stub->_is_closed.store(true);
+ stub->_close_cv.notify_all();
+}
+
+inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler& handler) {
+ ostr << "LoadStreamReplyHandler load_id=" << UniqueId(handler._load_id)
+ << ", dst_id=" << handler._dst_id;
+ return ostr;
}
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
@@ -114,39 +138,31 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
-LoadStreamStub::~LoadStreamStub() = default;
-
-Status LoadStreamStub::close_stream() {
- if (_is_init.load() && !_handler.is_closed()) {
- LOG(INFO) << "closing stream, load_id=" << print_id(_load_id) << ",
src_id=" << _src_id
- << ", dst_id=" << _dst_id << ", stream_id=" << _stream_id;
+LoadStreamStub::~LoadStreamStub() {
+ if (_is_init.load() && !_is_closed.load()) {
auto ret = brpc::StreamClose(_stream_id);
- if (ret != 0) {
- return Status::InternalError("StreamClose failed, err={}", ret);
- }
+ LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ?
"success" : "failed");
}
- return Status::OK();
}
// open_load_stream
-Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
+Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
+ BrpcClientCache<PBackendService_Stub>*
client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema,
int total_streams,
bool enable_profile) {
- std::unique_lock<bthread::Mutex> lock(_mutex);
+ std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
return Status::OK();
}
_dst_id = node_info.id;
- _handler.set_dst_id(_dst_id);
- _handler.set_load_id(_load_id);
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = config::load_stream_idle_timeout_ms;
opt.messages_in_batch = config::load_stream_messages_in_batch;
- opt.handler = &_handler;
+ opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
brpc::Controller cntl;
if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
return Status::Error<true>(ret, "Failed to create stream");
@@ -178,8 +194,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return Status::InternalError("Failed to connect to backend {}: {}",
_dst_id,
cntl.ErrorText());
}
- LOG(INFO) << "open load stream " << _stream_id << " load_id=" <<
print_id(_load_id)
- << " for backend " << _dst_id << " (" << host_port << ")";
+ LOG(INFO) << "open load stream to " << host_port << ", " << *this;
_is_init.store(true);
return Status::OK();
}
@@ -263,18 +278,6 @@ Status LoadStreamStub::get_schema(const
std::vector<PTabletID>& tablets) {
return _encode_and_send(header);
}
-void LoadStreamStub::add_schema(const std::vector<PTabletSchemaWithIndex>&
schemas) {
- std::lock_guard<bthread::Mutex> lock(_mutex);
- for (const auto& schema : schemas) {
- auto tablet_schema = std::make_unique<TabletSchema>();
- tablet_schema->init_from_pb(schema.tablet_schema());
- _tablet_schema_for_index->emplace(schema.index_id(),
std::move(tablet_schema));
- _enable_unique_mow_for_index->emplace(schema.index_id(),
-
schema.enable_unique_key_merge_on_write());
- }
- _schema_cv.notify_all();
-}
-
Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t timeout_ms) {
if (_tablet_schema_for_index->contains(index_id)) {
@@ -299,6 +302,36 @@ Status LoadStreamStub::wait_for_schema(int64_t
partition_id, int64_t index_id, i
return Status::OK();
}
+Status LoadStreamStub::close_wait(int64_t timeout_ms) {
+ DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
+ while (true) {
+ };
+ });
+ if (!_is_init.load() || _is_closed.load()) {
+ return Status::OK();
+ }
+ if (timeout_ms <= 0) {
+ timeout_ms = config::close_load_stream_timeout_ms;
+ }
+ DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
+ std::unique_lock<bthread::Mutex> lock(_close_mutex);
+ if (_is_closed.load()) {
+ return Status::OK();
+ }
+ int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+ if (ret != 0) {
+ return Status::InternalError(
+ "stream close_wait timeout, error={}, load_id={}, dst_id={},
stream_id={}", ret,
+ print_id(_load_id), _dst_id, _stream_id);
+ }
+ if (!_is_eos.load()) {
+ return Status::InternalError(
+ "stream closed without eos, load_id={}, dst_id={},
stream_id={}",
+ print_id(_load_id), _dst_id, _stream_id);
+ }
+ return Status::OK();
+}
+
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const
Slice> data) {
butil::IOBuf buf;
size_t header_len = header.ByteSizeLong();
@@ -359,4 +392,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf)
{
}
}
+inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub&
stub) {
+ ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ",
src_id=" << stub._src_id
+ << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id;
+ return ostr;
+}
+
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index f7d6844fe6b..81ec99fa451 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -84,70 +84,28 @@ using IndexToEnableMoW =
std::allocator<phmap::Pair<const
int64_t, bool>>, 4,
std::mutex>;
-class LoadStreamStub {
-private:
- class LoadStreamReplyHandler : public brpc::StreamInputHandler {
- public:
- LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
-
- int on_received_messages(brpc::StreamId id, butil::IOBuf* const
messages[],
- size_t size) override;
-
- void on_idle_timeout(brpc::StreamId id) override {}
-
- void on_closed(brpc::StreamId id) override;
-
- bool is_closed() { return _is_closed.load(); }
-
- bool is_eos() { return _is_eos.load(); }
-
- Status close_wait(int64_t timeout_ms) {
- DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
- std::unique_lock<bthread::Mutex> lock(_mutex);
- if (_is_closed) {
- return Status::OK();
- }
- int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
- if (ret != 0) {
- return Status::InternalError(
- "stream close_wait timeout, load_id={}, be_id={},
error={}",
- _load_id.to_string(), _dst_id, ret);
- }
- if (!_is_eos.load()) {
- return Status::InternalError("stream closed without eos,
load_id={} be_id={}",
- _load_id.to_string(), _dst_id);
- }
- return Status::OK();
- };
-
- std::vector<int64_t> success_tablets() {
- std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
- return _success_tablets;
- }
+class LoadStreamReplyHandler : public brpc::StreamInputHandler {
+public:
+ LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id,
std::weak_ptr<LoadStreamStub> stub)
+ : _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
- std::unordered_map<int64_t, Status> failed_tablets() {
- std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
- return _failed_tablets;
- }
+ int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
+ size_t size) override;
- void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
- void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); }
+ void on_idle_timeout(brpc::StreamId id) override {}
- private:
- UniqueId _load_id; // for logging
- int64_t _dst_id = -1; // for logging
- std::atomic<bool> _is_closed;
- std::atomic<bool> _is_eos;
- bthread::Mutex _mutex;
- bthread::ConditionVariable _close_cv;
+ void on_closed(brpc::StreamId id) override;
- bthread::Mutex _success_tablets_mutex;
- bthread::Mutex _failed_tablets_mutex;
- std::vector<int64_t> _success_tablets;
- std::unordered_map<int64_t, Status> _failed_tablets;
+ friend std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler& handler);
- LoadStreamStub* _stub = nullptr;
- };
+private:
+ PUniqueId _load_id; // for logging
+ int64_t _dst_id = -1; // for logging
+ std::weak_ptr<LoadStreamStub> _stub;
+};
+
+class LoadStreamStub {
+ friend class LoadStreamReplyHandler;
public:
// construct new stub
@@ -163,7 +121,8 @@ public:
~LoadStreamStub();
// open_load_stream
- Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
+ Status open(std::shared_ptr<LoadStreamStub> self,
+ BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int
total_streams,
bool enable_profile);
@@ -189,31 +148,16 @@ public:
// GET_SCHEMA
Status get_schema(const std::vector<PTabletID>& tablets);
- // close stream, usually close is initiated by the remote.
- // in case of remote failure, we should be able to close stream locally.
- Status close_stream();
-
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
- Status close_wait(int64_t timeout_ms = 0) {
- DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
- while (true) {
- };
- });
- if (!_is_init.load() || _handler.is_closed()) {
- return Status::OK();
- }
- if (timeout_ms <= 0) {
- timeout_ms = config::close_load_stream_timeout_ms;
- }
- return _handler.close_wait(timeout_ms);
- }
+ // if timeout_ms <= 0, will fallback to
config::close_load_stream_timeout_ms
+ Status close_wait(int64_t timeout_ms = 0);
Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
int64_t timeout_ms = 60000);
Status wait_for_new_schema(int64_t timeout_ms) {
- std::unique_lock<bthread::Mutex> lock(_mutex);
+ std::unique_lock<bthread::Mutex> lock(_schema_mutex);
if (timeout_ms > 0) {
int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait
schema update timeout");
@@ -222,8 +166,6 @@ public:
return Status::OK();
};
- void add_schema(const std::vector<PTabletSchemaWithIndex>& schemas);
-
std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
return (*_tablet_schema_for_index)[index_id];
}
@@ -232,9 +174,15 @@ public:
return _enable_unique_mow_for_index->at(index_id);
}
- std::vector<int64_t> success_tablets() { return
_handler.success_tablets(); }
+ std::vector<int64_t> success_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+ return _success_tablets;
+ }
- std::unordered_map<int64_t, Status> failed_tablets() { return
_handler.failed_tablets(); }
+ std::unordered_map<int64_t, Status> failed_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+ return _failed_tablets;
+ }
brpc::StreamId stream_id() const { return _stream_id; }
@@ -242,6 +190,8 @@ public:
int64_t dst_id() const { return _dst_id; }
+ friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub&
stub);
+
private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data
= {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
@@ -249,10 +199,19 @@ private:
protected:
std::atomic<bool> _is_init;
- bthread::Mutex _mutex;
-
+ std::atomic<bool> _is_closed;
+ std::atomic<bool> _is_eos;
std::atomic<int> _use_cnt;
+ PUniqueId _load_id;
+ brpc::StreamId _stream_id;
+ int64_t _src_id = -1; // source backend_id
+ int64_t _dst_id = -1; // destination backend_id
+
+ bthread::Mutex _open_mutex;
+ bthread::Mutex _close_mutex;
+ bthread::ConditionVariable _close_cv;
+
std::mutex _tablets_to_commit_mutex;
std::vector<PTabletID> _tablets_to_commit;
@@ -260,15 +219,15 @@ protected:
std::mutex _send_mutex;
butil::IOBuf _buffer;
- PUniqueId _load_id;
- brpc::StreamId _stream_id;
- int64_t _src_id = -1; // source backend_id
- int64_t _dst_id = -1; // destination backend_id
- LoadStreamReplyHandler _handler {this};
-
+ bthread::Mutex _schema_mutex;
bthread::ConditionVariable _schema_cv;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+
+ bthread::Mutex _success_tablets_mutex;
+ bthread::Mutex _failed_tablets_mutex;
+ std::vector<int64_t> _success_tablets;
+ std::unordered_map<int64_t, Status> _failed_tablets;
};
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 3cf168a131e..1baa903f2ee 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -31,24 +31,6 @@ void LoadStreams::release(Status status) {
DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id="
<< _dst_id;
- for (auto& stream : _streams) {
- auto st = stream->close_stream();
- DBUG_EXECUTE_IF("LoadStreams.release.close_stream_failed",
- { st = Status::InternalError("stream close
failed"); });
- if (!st.ok()) {
- LOG(WARNING) << "close stream failed " << st;
- }
- }
- if (status.ok()) {
- for (auto& stream : _streams) {
- auto st = stream->close_wait();
- DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
- { st = Status::InternalError("stream close
wait timeout"); });
- if (!st.ok()) {
- LOG(WARNING) << "close wait failed " << st;
- }
- }
- }
_pool->erase(_load_id, _dst_id);
} else {
LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" <<
_dst_id
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 0ac06eaa2ca..1f1ed6591d2 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -278,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, LoadStreams& st
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
- _txn_id, *_schema, tablets_for_schema,
_total_streams,
- _state->enable_profile()));
+ RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
+ *node_info, _txn_id, *_schema,
tablets_for_schema,
+ _total_streams,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
- _txn_id, *_schema, {}, _total_streams,
+ RETURN_IF_ERROR(stream->open(stream,
_state->exec_env()->brpc_internal_client_cache(),
+ *node_info, _txn_id, *_schema, {},
_total_streams,
_state->enable_profile()));
}
return Status::OK();
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
index 0f75cb12a62..ea790577449 100644
---
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
@@ -89,12 +89,8 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed",
"StreamWrite failed, err=32")
// LoadStreams keeping stream when release
load_with_injection("LoadStreams.release.keeping_streams", "")
- // LoadStreams close stream failed
- load_with_injection("LoadStreams.release.close_stream_failed", "")
- // LoadStreams close wait failed
- load_with_injection("LoadStreams.release.close_wait_failed", "")
sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]