This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6da36e10773 [feature](merge-cloud) Refactor write path code by
abstract base class (#26537)
6da36e10773 is described below
commit 6da36e107739905c1f2665028c5788f1a7313902
Author: plat1ko <[email protected]>
AuthorDate: Fri Dec 8 14:50:36 2023 +0800
[feature](merge-cloud) Refactor write path code by abstract base class
(#26537)
Refactor write path code by abstract base class. Whether to use
`StorageEngine` or `CloudStorageEngine` will be determined during compilation
instead of runtime `config::cloud_mode` to avoid unexpected null pointer or
undefined behavior issues caused by merging code.
Class that depend on `StorageEngine` but are shared by the cloud mode need
to have an abstract base class. Common code should be extracted into the base
class, while the code that depends on `StorageEngine` should be implemented in
a `StorageEngine` mix-in class of the base class.
---
be/src/cloud/config.cpp | 2 +-
be/src/cloud/config.h | 2 +-
be/src/olap/delta_writer.cpp | 101 +++++-----
be/src/olap/delta_writer.h | 71 ++++---
be/src/olap/rowset/beta_rowset_writer.cpp | 215 ++++++++++++---------
be/src/olap/rowset/beta_rowset_writer.h | 118 ++++++-----
be/src/olap/rowset/beta_rowset_writer_v2.h | 4 -
be/src/olap/rowset/rowset_factory.cpp | 5 +-
be/src/olap/rowset/rowset_writer.h | 4 -
be/src/olap/rowset/segcompaction.h | 4 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 14 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 10 +-
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 3 +
be/src/olap/rowset/vertical_beta_rowset_writer.h | 7 +-
be/src/olap/rowset_builder.cpp | 151 +++++++--------
be/src/olap/rowset_builder.h | 53 +++--
be/src/runtime/exec_env.cpp | 2 +-
be/src/runtime/exec_env.h | 6 +
be/src/runtime/load_channel.cpp | 33 +++-
be/src/runtime/load_channel.h | 41 +---
be/src/runtime/load_stream.h | 12 +-
be/src/runtime/load_stream_mgr.h | 8 +-
be/src/runtime/load_stream_writer.cpp | 20 +-
be/src/runtime/load_stream_writer.h | 22 +--
be/src/runtime/tablets_channel.cpp | 202 ++++++++++---------
be/src/runtime/tablets_channel.h | 97 +++++-----
be/src/vec/olap/block_reader.cpp | 13 +-
be/src/vec/olap/vertical_block_reader.cpp | 11 +-
be/test/olap/delta_writer_test.cpp | 42 ++--
.../olap/engine_storage_migration_task_test.cpp | 9 +-
be/test/olap/memtable_memory_limiter_test.cpp | 7 +-
be/test/olap/tablet_cooldown_test.cpp | 7 +-
32 files changed, 680 insertions(+), 616 deletions(-)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index e7e1510b2ba..4d9da1e9cfc 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -20,7 +20,7 @@
namespace doris {
namespace config {
-DEFINE_Bool(cloud_mode, "false");
+// TODO
} // namespace config
} // namespace doris
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index da7fea826cc..21a3b6052f5 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -22,7 +22,7 @@
namespace doris {
namespace config {
-DECLARE_Bool(cloud_mode);
+// TODO
} // namespace config
} // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 310ffacad9a..dc5973bf04d 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -28,7 +28,7 @@
#include <string>
#include <utility>
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
@@ -41,6 +41,7 @@
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "olap/rowset_builder.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -57,25 +58,29 @@
namespace doris {
using namespace ErrorCode;
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
RuntimeProfile* profile,
- const UniqueId& load_id) {
- *writer = new DeltaWriter(req, StorageEngine::instance(), profile,
load_id);
- return Status::OK();
+BaseDeltaWriter::BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile,
+ const UniqueId& load_id)
+ : _req(*req), _memtable_writer(new MemTableWriter(*req)) {
+ _init_profile(profile);
}
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
+DeltaWriter::DeltaWriter(StorageEngine& engine, WriteRequest* req,
RuntimeProfile* profile,
const UniqueId& load_id)
- : _req(*req), _rowset_builder(*req, profile), _memtable_writer(new
MemTableWriter(*req)) {
- _init_profile(profile);
+ : BaseDeltaWriter(req, profile, load_id), _engine(engine) {
+ _rowset_builder = std::make_unique<RowsetBuilder>(_engine, *req, profile);
}
-void DeltaWriter::_init_profile(RuntimeProfile* profile) {
+void BaseDeltaWriter::_init_profile(RuntimeProfile* profile) {
_profile = profile->create_child(fmt::format("DeltaWriter {}",
_req.tablet_id), true, true);
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
+}
+
+void DeltaWriter::_init_profile(RuntimeProfile* profile) {
+ BaseDeltaWriter::_init_profile(profile);
_commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
}
-DeltaWriter::~DeltaWriter() {
+BaseDeltaWriter::~BaseDeltaWriter() {
if (!_is_init) {
return;
}
@@ -83,33 +88,35 @@ DeltaWriter::~DeltaWriter() {
// cancel and wait all memtables in flush queue to be finished
static_cast<void>(_memtable_writer->cancel());
- if (_rowset_builder.tablet() != nullptr) {
+ if (_rowset_builder->tablet() != nullptr) {
const FlushStatistic& stat = _memtable_writer->get_flush_token_stats();
-
_rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes);
-
_rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count);
+
_rowset_builder->tablet()->flush_bytes->increment(stat.flush_size_bytes);
+
_rowset_builder->tablet()->flush_finish_count->increment(stat.flush_finish_count);
}
}
-Status DeltaWriter::init() {
+DeltaWriter::~DeltaWriter() = default;
+
+Status BaseDeltaWriter::init() {
if (_is_init) {
return Status::OK();
}
- RETURN_IF_ERROR(_rowset_builder.init());
- RETURN_IF_ERROR(
- _memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
- _rowset_builder.get_partial_update_info(),
-
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
+ RETURN_IF_ERROR(_rowset_builder->init());
+ RETURN_IF_ERROR(_memtable_writer->init(
+ _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
+ _rowset_builder->get_partial_update_info(),
+ _rowset_builder->tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
return Status::OK();
}
-Status DeltaWriter::append(const vectorized::Block* block) {
+Status BaseDeltaWriter::append(const vectorized::Block* block) {
return write(block, {}, true);
}
-Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
- bool is_append) {
+Status BaseDeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
+ bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
}
@@ -121,11 +128,11 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<uint
}
return _memtable_writer->write(block, row_idxs, is_append);
}
-Status DeltaWriter::wait_flush() {
+Status BaseDeltaWriter::wait_flush() {
return _memtable_writer->wait_flush();
}
-Status DeltaWriter::close() {
+Status BaseDeltaWriter::close() {
_lock_watch.start();
std::lock_guard<std::mutex> l(_lock);
_lock_watch.stop();
@@ -140,37 +147,31 @@ Status DeltaWriter::close() {
return _memtable_writer->close();
}
-Status DeltaWriter::build_rowset() {
+Status BaseDeltaWriter::build_rowset() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before
build_rowset() being called";
SCOPED_TIMER(_close_wait_timer);
RETURN_IF_ERROR(_memtable_writer->close_wait(_profile));
- return _rowset_builder.build_rowset();
+ return _rowset_builder->build_rowset();
}
-Status DeltaWriter::submit_calc_delete_bitmap_task() {
- return _rowset_builder.submit_calc_delete_bitmap_task();
+Status BaseDeltaWriter::submit_calc_delete_bitmap_task() {
+ return _rowset_builder->submit_calc_delete_bitmap_task();
}
-Status DeltaWriter::wait_calc_delete_bitmap() {
- return _rowset_builder.wait_calc_delete_bitmap();
+Status BaseDeltaWriter::wait_calc_delete_bitmap() {
+ return _rowset_builder->wait_calc_delete_bitmap();
}
-Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
- const bool write_single_replica) {
- if (config::cloud_mode) {
- return Status::OK();
- }
+Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes) {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
- RETURN_IF_ERROR(_rowset_builder.commit_txn());
+ RETURN_IF_ERROR(_rowset_builder->commit_txn());
- if (write_single_replica) {
- for (auto node_info : slave_tablet_nodes.slave_nodes()) {
- _request_slave_tablet_pull_rowset(node_info);
- }
+ for (auto&& node_info : slave_tablet_nodes.slave_nodes()) {
+ _request_slave_tablet_pull_rowset(node_info);
}
return Status::OK();
}
@@ -191,11 +192,11 @@ void DeltaWriter::add_finished_slave_replicas(
success_slave_tablet_node_ids->insert({_req.tablet_id,
_success_slave_node_ids});
}
-Status DeltaWriter::cancel() {
+Status BaseDeltaWriter::cancel() {
return cancel_with_status(Status::Cancelled("already cancelled"));
}
-Status DeltaWriter::cancel_with_status(const Status& st) {
+Status BaseDeltaWriter::cancel_with_status(const Status& st) {
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
@@ -205,14 +206,11 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
return Status::OK();
}
-int64_t DeltaWriter::mem_consumption(MemType mem) {
+int64_t BaseDeltaWriter::mem_consumption(MemType mem) {
return _memtable_writer->mem_consumption(mem);
}
void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
- if (config::cloud_mode) [[unlikely]] {
- return;
- }
std::shared_ptr<PBackendService_Stub> stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
node_info.host(), node_info.async_internal_port());
@@ -224,15 +222,14 @@ void
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
return;
}
-
StorageEngine::instance()->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id,
-
_req.tablet_id, this);
+ _engine.txn_manager()->add_txn_tablet_delta_writer(_req.txn_id,
_req.tablet_id, this);
{
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
_unfinished_slave_node.insert(node_info.id());
}
std::vector<std::pair<int64_t, std::string>> indices_ids;
- auto cur_rowset = _rowset_builder.rowset();
+ auto cur_rowset = _rowset_builder->rowset();
auto tablet_schema = cur_rowset->rowset_meta()->tablet_schema();
if (!tablet_schema->skip_write_index_on_load()) {
for (auto& column : tablet_schema->columns()) {
@@ -247,7 +244,7 @@ void
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
*(request->mutable_rowset_meta()) =
cur_rowset->rowset_meta()->get_rowset_pb();
request->set_host(BackendOptions::get_localhost());
request->set_http_port(config::webserver_port);
- string tablet_path = _rowset_builder.tablet()->tablet_path();
+ string tablet_path = _rowset_builder->tablet()->tablet_path();
request->set_rowset_path(tablet_path);
request->set_token(ExecEnv::GetInstance()->token());
request->set_brpc_port(config::brpc_port);
@@ -314,8 +311,8 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t
node_id, bool is_succe
_unfinished_slave_node.erase(node_id);
}
-int64_t DeltaWriter::num_rows_filtered() const {
- auto rowset_writer = _rowset_builder.rowset_writer();
+int64_t BaseDeltaWriter::num_rows_filtered() const {
+ auto rowset_writer = _rowset_builder->rowset_writer();
return rowset_writer == nullptr ? 0 : rowset_writer->num_rows_filtered();
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index d7e351a168e..c782c5ef3b6 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -20,7 +20,6 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
-#include <stdint.h>
#include <atomic>
#include <memory>
@@ -34,7 +33,6 @@
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
-#include "olap/rowset_builder.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
@@ -56,14 +54,15 @@ namespace vectorized {
class Block;
} // namespace vectorized
+class BaseRowsetBuilder;
+
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
-class DeltaWriter {
+class BaseDeltaWriter {
public:
- static Status open(WriteRequest* req, DeltaWriter** writer,
RuntimeProfile* profile,
- const UniqueId& load_id = TUniqueId());
+ BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile, const
UniqueId& load_id);
- ~DeltaWriter();
+ virtual ~BaseDeltaWriter();
Status init();
@@ -79,15 +78,6 @@ public:
Status build_rowset();
Status submit_calc_delete_bitmap_task();
Status wait_calc_delete_bitmap();
- Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool
write_single_replica);
-
- bool check_slave_replicas_done(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
- success_slave_tablet_node_ids);
-
- void add_finished_slave_replicas(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
- success_slave_tablet_node_ids);
-
- void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
@@ -109,34 +99,55 @@ public:
int64_t num_rows_filtered() const;
- // For UT
- DeleteBitmapPtr get_delete_bitmap() { return
_rowset_builder.get_delete_bitmap(); }
-
-private:
- DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
- const UniqueId& load_id);
-
- void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
-
- void _init_profile(RuntimeProfile* profile);
+protected:
+ virtual void _init_profile(RuntimeProfile* profile);
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
- RowsetBuilder _rowset_builder;
+ std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<MemTableWriter> _memtable_writer;
std::mutex _lock;
- std::unordered_set<int64_t> _unfinished_slave_node;
- PSuccessSlaveTabletNodeIds _success_slave_node_ids;
- std::shared_mutex _slave_node_lock;
+ // total rows num written by DeltaWriter
+ std::atomic<int64_t> _total_received_rows = 0;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
- RuntimeProfile::Counter* _commit_txn_timer = nullptr;
MonotonicStopWatch _lock_watch;
};
+// `StorageEngine` mixin for `BaseDeltaWriter`
+class DeltaWriter final : public BaseDeltaWriter {
+public:
+ DeltaWriter(StorageEngine& engine, WriteRequest* req, RuntimeProfile*
profile,
+ const UniqueId& load_id);
+
+ ~DeltaWriter() override;
+
+ Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes);
+
+ bool check_slave_replicas_done(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
+ success_slave_tablet_node_ids);
+
+ void add_finished_slave_replicas(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
+ success_slave_tablet_node_ids);
+
+ void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+
+private:
+ void _init_profile(RuntimeProfile* profile) override;
+
+ void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
+
+ StorageEngine& _engine;
+ std::unordered_set<int64_t> _unfinished_slave_node;
+ PSuccessSlaveTabletNodeIds _success_slave_node_ids;
+ std::shared_mutex _slave_node_lock;
+
+ RuntimeProfile::Counter* _commit_txn_timer = nullptr;
+};
+
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index ab9e97a3d3b..6850ce0f43c 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -25,10 +25,11 @@
#include <ctime> // time
#include <filesystem>
+#include <memory>
#include <sstream>
#include <utility>
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
@@ -40,6 +41,7 @@
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segcompaction.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
@@ -59,27 +61,53 @@
namespace doris {
using namespace ErrorCode;
-BetaRowsetWriter::BetaRowsetWriter()
+namespace {
+
+bool is_segment_overlapping(const std::vector<KeyBoundsPB>&
segments_encoded_key_bounds) {
+ std::string_view last;
+ for (auto&& segment_encode_key : segments_encoded_key_bounds) {
+ auto&& cur_min = segment_encode_key.min_key();
+ auto&& cur_max = segment_encode_key.max_key();
+ if (cur_min <= last) {
+ return true;
+ }
+ last = cur_max;
+ }
+ return false;
+}
+
+void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
+ const RowsetMeta& spec_rowset_meta) {
+ rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
+ rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
+ rowset_meta.set_data_disk_size(spec_rowset_meta.total_disk_size());
+ rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
+ // TODO write zonemap to meta
+ rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
+ rowset_meta.set_creation_time(time(nullptr));
+ rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
+ rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
+ rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
+
+ std::vector<KeyBoundsPB> segments_key_bounds;
+ spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
+ rowset_meta.set_segments_key_bounds(segments_key_bounds);
+}
+
+} // namespace
+
+BaseBetaRowsetWriter::BaseBetaRowsetWriter()
: _rowset_meta(nullptr),
_num_segment(0),
_segment_start_id(0),
- _segcompacted_point(0),
- _num_segcompacted(0),
_num_rows_written(0),
_total_data_size(0),
- _total_index_size(0),
- _segcompaction_worker(this),
- _is_doing_segcompaction(false) {
- _segcompaction_status.store(OK);
-}
+ _total_index_size(0) {}
-BetaRowsetWriter::~BetaRowsetWriter() {
- /* Note that segcompaction is async and in parallel with load job. So we
should handle carefully
- * when the job is cancelled. Although it is meaningless to continue
segcompaction when the job
- * is cancelled, the objects involved in the job should be preserved
during segcompaction to
- * avoid crashs for memory issues. */
- WARN_IF_ERROR(wait_flying_segcompaction(), "segment compaction failed");
+BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
+ : _engine(engine),
_segcompaction_worker(std::make_unique<SegcompactionWorker>(this)) {}
+BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
// TODO(lingbin): Should wrapper exception logic, no need to know file ops
directly.
if (!_already_built) { // abnormal exit, remove all files generated
WARN_IF_ERROR(_segment_creator.close(),
@@ -100,7 +128,15 @@ BetaRowsetWriter::~BetaRowsetWriter() {
}
}
-Status BetaRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context) {
+BetaRowsetWriter::~BetaRowsetWriter() {
+ /* Note that segcompaction is async and in parallel with load job. So we
should handle carefully
+ * when the job is cancelled. Although it is meaningless to continue
segcompaction when the job
+ * is cancelled, the objects involved in the job should be preserved
during segcompaction to
+ * avoid crashs for memory issues. */
+ WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");
+}
+
+Status BaseBetaRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context) {
_context = rowset_writer_context;
_rowset_meta.reset(new RowsetMeta);
_rowset_meta->set_fs(_context.fs);
@@ -121,21 +157,17 @@ Status BetaRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context)
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
_rowset_meta->set_tablet_schema(_context.tablet_schema);
- _context.segment_collector =
std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this);
- _context.file_writer_creator =
std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this);
+ _context.segment_collector =
std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
+ _context.file_writer_creator =
std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
RETURN_IF_ERROR(_segment_creator.init(_context));
return Status::OK();
}
-Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
+Status BaseBetaRowsetWriter::add_block(const vectorized::Block* block) {
return _segment_creator.add_block(block);
}
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::NotSupported("_generate_delete_bitmap");
- }
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
(_context.partial_update_info &&
_context.partial_update_info->is_partial_update)) {
@@ -374,12 +406,11 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
} else if ((_num_segment - _segcompacted_point) >=
config::segcompaction_batch_size) {
SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
- if (LIKELY(status.ok()) && (segments->size() > 0)) {
+ if (LIKELY(status.ok()) && (!segments->empty())) {
LOG(INFO) << "submit segcompaction task, tablet_id:" <<
_context.tablet_id
<< " rowset_id:" << _context.rowset_id << " segment
num:" << _num_segment
<< ", segcompacted_point:" << _segcompacted_point;
- status =
StorageEngine::instance()->submit_seg_compaction_task(&_segcompaction_worker,
-
segments);
+ status =
_engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments);
if (status.ok()) {
return status;
}
@@ -415,7 +446,7 @@ Status
BetaRowsetWriter::_segcompaction_rename_last_segments() {
return Status::OK();
}
-Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
+Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir,
_context.rowset_id));
_num_rows_written += rowset->num_rows();
@@ -438,17 +469,17 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr
rowset) {
return Status::OK();
}
-Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr
rowset) {
+Status
BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr
rowset) {
// TODO use schema_mapping to transfer zonemap
return add_rowset(rowset);
}
-Status BetaRowsetWriter::flush() {
+Status BaseBetaRowsetWriter::flush() {
return _segment_creator.flush();
}
-Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t
segment_id,
- int64_t* flush_size) {
+Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t
segment_id,
+ int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
@@ -460,11 +491,11 @@ Status
BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen
return Status::OK();
}
-Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
+Status BaseBetaRowsetWriter::flush_single_block(const vectorized::Block*
block) {
return _segment_creator.flush_single_block(block);
}
-Status BetaRowsetWriter::wait_flying_segcompaction() {
+Status BetaRowsetWriter::_wait_flying_segcompaction() {
std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
uint64_t begin_wait = GetCurrentTimeMicros();
while (_is_doing_segcompaction) {
@@ -481,12 +512,12 @@ Status BetaRowsetWriter::wait_flying_segcompaction() {
return Status::OK();
}
-RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr&
spec_rowset_meta) {
+RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr&
spec_rowset_meta) {
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
- _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta);
+ build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(_context.tablet_schema,
_context.rowset_dir,
_rowset_meta, &rowset);
@@ -498,7 +529,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const
RowsetMetaSharedPtr& spec_r
return rowset;
}
-Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
+Status BaseBetaRowsetWriter::_close_file_writers() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
RETURN_NOT_OK_STATUS_WITH_WARN(
@@ -506,20 +537,31 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
fmt::format("failed to close file writer, path={}",
file_writer->path().string()));
}
RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
- "failed to close segment creator when build
new rowset")
+ "failed to close segment creator when build
new rowset");
+ return Status::OK();
+}
+
+Status BetaRowsetWriter::_close_file_writers() {
+ RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers());
// if _segment_start_id is not zero, that means it's a transient rowset
writer for
// MoW partial update, don't need to do segment compaction.
if (_segment_start_id == 0) {
- _segcompaction_worker.cancel();
- RETURN_NOT_OK_STATUS_WITH_WARN(wait_flying_segcompaction(),
+ _segcompaction_worker->cancel();
+ RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
"segcompaction failed when build new
rowset");
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
"rename last segments failed when build
new rowset");
- if (_segcompaction_worker.get_file_writer()) {
-
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(),
+ if (_segcompaction_worker->get_file_writer()) {
+
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(),
"close segment compaction worker
failed");
}
}
+ return Status::OK();
+}
+
+Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) {
+ RETURN_IF_ERROR(_close_file_writers());
+
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
"too many segments when build new rowset");
_build_rowset_meta(_rowset_meta);
@@ -541,25 +583,19 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
return Status::OK();
}
-bool BetaRowsetWriter::_is_segment_overlapping(
- const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
- std::string last;
- for (auto segment_encode_key : segments_encoded_key_bounds) {
- auto cur_min = segment_encode_key.min_key();
- auto cur_max = segment_encode_key.max_key();
- if (cur_min <= last) {
- return true;
- }
- last = cur_max;
- }
- return false;
+int64_t BaseBetaRowsetWriter::_num_seg() const {
+ return _num_segment;
+}
+
+int64_t BetaRowsetWriter::_num_seg() const {
+ return _is_segcompacted() ? _num_segcompacted : _num_segment;
}
// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
-void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
+void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema)
{
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
TabletSchemaSPtr update_schema;
static_cast<void>(vectorized::schema_util::get_least_common_schema(
@@ -573,26 +609,7 @@ void
BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
VLOG_DEBUG << "dump rs schema: " <<
_context.tablet_schema->dump_structure();
}
-void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
- RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr&
spec_rowset_meta) {
- rowset_meta->set_num_rows(spec_rowset_meta->num_rows());
- rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size());
- rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size());
- rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size());
- // TODO write zonemap to meta
- rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0);
- rowset_meta->set_creation_time(time(nullptr));
- rowset_meta->set_num_segments(spec_rowset_meta->num_segments());
- rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap());
- rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state());
-
- std::vector<KeyBoundsPB> segments_key_bounds;
- spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds);
- rowset_meta->set_segments_key_bounds(segments_key_bounds);
-}
-
-void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta>
rowset_meta) {
- int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta>
rowset_meta) {
int64_t num_rows_written = 0;
int64_t total_data_size = 0;
int64_t total_index_size = 0;
@@ -613,11 +630,11 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
// segment key bounds are empty in old version(before version 1.2.x). So
we should not modify
// the overlap property when key bounds are empty.
if (!segments_encoded_key_bounds.empty() &&
- !_is_segment_overlapping(segments_encoded_key_bounds)) {
+ !is_segment_overlapping(segments_encoded_key_bounds)) {
rowset_meta->set_segments_overlap(NONOVERLAPPING);
}
- rowset_meta->set_num_segments(num_seg);
+ rowset_meta->set_num_segments(_num_seg());
// TODO(zhangzhengyu): key_bounds.size() should equal num_seg, but
currently not always
rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
rowset_meta->set_total_disk_size(total_data_size + _total_data_size);
@@ -635,7 +652,7 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
}
}
-RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
+RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
*rowset_meta_ = *_rowset_meta;
_build_rowset_meta(rowset_meta_);
@@ -650,7 +667,7 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
return rowset;
}
-Status BetaRowsetWriter::_create_file_writer(std::string path,
io::FileWriterPtr& file_writer) {
+Status BaseBetaRowsetWriter::_create_file_writer(std::string path,
io::FileWriterPtr& file_writer) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
@@ -673,7 +690,8 @@ Status BetaRowsetWriter::_create_file_writer(std::string
path, io::FileWriterPtr
return Status::OK();
}
-Status BetaRowsetWriter::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer) {
+Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id,
+ io::FileWriterPtr&
file_writer) {
std::string path;
path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, segment_id);
return _create_file_writer(path, file_writer);
@@ -693,15 +711,28 @@ Status
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
writer_options.write_type = _context.write_type;
writer_options.write_type = DataWriteType::TYPE_COMPACTION;
- writer->reset(new segment_v2::SegmentWriter(file_writer.get(),
_num_segcompacted,
- _context.tablet_schema,
_context.tablet,
- _context.data_dir,
_context.max_rows_per_segment,
- writer_options,
_context.mow_context));
- if (_segcompaction_worker.get_file_writer() != nullptr) {
- RETURN_IF_ERROR(_segcompaction_worker.get_file_writer()->close());
+ *writer = std::make_unique<segment_v2::SegmentWriter>(
+ file_writer.get(), _num_segcompacted, _context.tablet_schema,
_context.tablet,
+ _context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context);
+ if (_segcompaction_worker->get_file_writer() != nullptr) {
+ RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
}
- _segcompaction_worker.get_file_writer().reset(file_writer.release());
+ _segcompaction_worker->get_file_writer().reset(file_writer.release());
+
+ return Status::OK();
+}
+Status BaseBetaRowsetWriter::_check_segment_number_limit() {
+ size_t total_segment_num = _num_segment + 1;
+
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+ { total_segment_num = dp->param("segnum", 1024); });
+ if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
+ return Status::Error<TOO_MANY_SEGMENTS>(
+ "too many segments in rowset. tablet_id:{}, rowset_id:{},
max:{}, "
+ "_num_segment:{}, ",
+ _context.tablet_id, _context.rowset_id.to_string(),
+ config::max_segment_num_per_rowset, _num_segment);
+ }
return Status::OK();
}
@@ -720,8 +751,8 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}
-Status BetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) {
+Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
+ TabletSchemaSPtr flush_schema) {
uint32_t segid_offset = segment_id - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
@@ -750,10 +781,16 @@ Status BetaRowsetWriter::add_segment(uint32_t segment_id,
const SegmentStatistic
if (_context.mow_context != nullptr) {
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
}
- RETURN_IF_ERROR(_segcompaction_if_necessary());
return Status::OK();
}
+Status BetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
+ TabletSchemaSPtr flush_schema) {
+ RETURN_IF_ERROR(
+ BaseBetaRowsetWriter::add_segment(segment_id, segstat,
std::move(flush_schema)));
+ return _segcompaction_if_necessary();
+}
+
Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
KeyBoundsPB& key_bounds) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 68932c5ef77..347994e243e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -19,8 +19,6 @@
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
-#include <stdint.h>
#include <algorithm>
#include <atomic>
@@ -43,7 +41,6 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_creator.h"
-#include "segcompaction.h"
#include "segment_v2/segment.h"
#include "util/spinlock.h"
@@ -59,21 +56,19 @@ class SegmentWriter;
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr =
std::shared_ptr<SegCompactionCandidates>;
-class BetaRowsetWriter : public RowsetWriter {
- friend class SegcompactionWorker;
-
+class BaseBetaRowsetWriter : public RowsetWriter {
public:
- BetaRowsetWriter();
+ BaseBetaRowsetWriter();
- ~BetaRowsetWriter() override;
+ ~BaseBetaRowsetWriter() override;
Status init(const RowsetWriterContext& rowset_writer_context) override;
Status add_block(const vectorized::Block* block) override;
+ // Declare these interface in `BaseBetaRowsetWriter`
// add rowset by create hard link
Status add_rowset(RowsetSharedPtr rowset) override;
-
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
override;
Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer)
override;
@@ -114,14 +109,6 @@ public:
int32_t allocate_segment_id() override { return
_segment_creator.allocate_segment_id(); };
- Status flush_segment_writer_for_segcompaction(
- std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
- KeyBoundsPB& key_bounds);
-
- bool is_doing_segcompaction() const override { return
_is_doing_segcompaction; }
-
- Status wait_flying_segcompaction() override;
-
void set_segment_start_id(int32_t start_id) override {
_segment_creator.set_segment_start_id(start_id);
_segment_start_id = start_id;
@@ -142,36 +129,20 @@ public:
const RowsetWriterContext& context() const override { return _context; }
private:
- Status _create_file_writer(std::string path, io::FileWriterPtr&
file_writer);
- Status _check_segment_number_limit();
- Status _generate_delete_bitmap(int32_t segment_id);
+ virtual Status _generate_delete_bitmap(int32_t segment_id) = 0;
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
- // segment compaction
- Status _create_segment_writer_for_segcompaction(
- std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
- Status _segcompaction_if_necessary();
- Status _segcompaction_rename_last_segments();
- Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
- Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr&
segments);
- bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
-
- bool _check_and_set_is_doing_segcompaction();
-
- void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta,
- const RowsetMetaSharedPtr&
spec_rowset_meta);
- bool _is_segment_overlapping(const std::vector<KeyBoundsPB>&
segments_encoded_key_bounds);
- void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
uint64_t end);
- Status _rename_compacted_segments(int64_t begin, int64_t end);
- Status _rename_compacted_segment_plain(uint64_t seg_id);
- Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t
seg_id);
-
void update_rowset_schema(TabletSchemaSPtr flush_schema);
// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
+protected:
+ Status _create_file_writer(std::string path, io::FileWriterPtr&
file_writer);
+ virtual Status _close_file_writers();
+ virtual Status _check_segment_number_limit();
+ virtual int64_t _num_seg() const;
+ // build a tmp rowset for load segment to calc delete_bitmap for this
segment
RowsetSharedPtr _build_tmp();
-protected:
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;
@@ -179,9 +150,6 @@ protected:
roaring::Roaring _segment_set; // bitmap set to record flushed segment
id
std::mutex _segment_set_mutex; // mutex for _segment_set
int32_t _segment_start_id; // basic write start from 0, partial
update may be different
- std::atomic<int32_t> _segcompacted_point; // segemnts before this point
have
- // already been segment compacted
- std::atomic<int32_t> _num_segcompacted; // index for segment compaction
mutable SpinLock _lock; // protect following vectors.
// record rows number of every segment already written, using for rowid
@@ -204,15 +172,6 @@ protected:
bool _already_built = false;
SegmentCreator _segment_creator;
- SegcompactionWorker _segcompaction_worker;
-
- // ensure only one inflight segcompaction task for each rowset
- std::atomic<bool> _is_doing_segcompaction;
- // enforce compare-and-swap on _is_doing_segcompaction
- std::mutex _is_doing_segcompaction_lock;
- std::condition_variable _segcompacting_cond;
-
- std::atomic<int> _segcompaction_status;
fmt::memory_buffer vlog_buffer;
@@ -222,4 +181,59 @@ protected:
int64_t _segment_writer_ns = 0;
};
+class SegcompactionWorker;
+
+// `StorageEngine` mixin for `BaseBetaRowsetWriter`
+class BetaRowsetWriter : public BaseBetaRowsetWriter {
+public:
+ BetaRowsetWriter(StorageEngine& engine);
+
+ ~BetaRowsetWriter() override;
+
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
+ TabletSchemaSPtr flush_schema) override;
+
+ Status flush_segment_writer_for_segcompaction(
+ std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
+ KeyBoundsPB& key_bounds);
+
+private:
+ Status _generate_delete_bitmap(int32_t segment_id) override;
+
+ // segment compaction
+ friend class SegcompactionWorker;
+ Status _close_file_writers() override;
+ Status _check_segment_number_limit() override;
+ int64_t _num_seg() const override;
+ Status _wait_flying_segcompaction();
+ Status _create_segment_writer_for_segcompaction(
+ std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin,
int64_t end);
+ Status _segcompaction_if_necessary();
+ Status _segcompaction_rename_last_segments();
+ Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
+ Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr&
segments);
+ bool _is_segcompacted() const { return _num_segcompacted > 0; }
+ bool _check_and_set_is_doing_segcompaction();
+ Status _rename_compacted_segments(int64_t begin, int64_t end);
+ Status _rename_compacted_segment_plain(uint64_t seg_id);
+ Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t
seg_id);
+ void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
uint64_t end);
+
+ StorageEngine& _engine;
+
+ std::atomic<int32_t> _segcompacted_point {0}; // segemnts before this
point have
+ // already been segment
compacted
+ std::atomic<int32_t> _num_segcompacted {0}; // index for segment
compaction
+
+ std::unique_ptr<SegcompactionWorker> _segcompaction_worker;
+
+ // ensure only one inflight segcompaction task for each rowset
+ std::atomic<bool> _is_doing_segcompaction {false};
+ // enforce compare-and-swap on _is_doing_segcompaction
+ std::mutex _is_doing_segcompaction_lock;
+ std::condition_variable _segcompacting_cond;
+
+ std::atomic<int> _segcompaction_status {ErrorCode::OK};
+};
+
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 4a99acdaba6..bdcd8a47a98 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -127,10 +127,6 @@ public:
int32_t allocate_segment_id() override { return
_segment_creator.allocate_segment_id(); };
- bool is_doing_segcompaction() const override { return false; }
-
- Status wait_flying_segcompaction() override { return Status::OK(); }
-
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
int64_t segment_writer_ns() override { return _segment_writer_ns; }
diff --git a/be/src/olap/rowset/rowset_factory.cpp
b/be/src/olap/rowset/rowset_factory.cpp
index 8447154483c..9758a09e7ea 100644
--- a/be/src/olap/rowset/rowset_factory.cpp
+++ b/be/src/olap/rowset/rowset_factory.cpp
@@ -27,6 +27,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/vertical_beta_rowset_writer.h"
+#include "olap/storage_engine.h"
namespace doris {
using namespace ErrorCode;
@@ -51,10 +52,10 @@ Status RowsetFactory::create_rowset_writer(const
RowsetWriterContext& context, b
}
if (context.rowset_type == BETA_ROWSET) {
if (is_vertical) {
- output->reset(new VerticalBetaRowsetWriter);
+ output->reset(new
VerticalBetaRowsetWriter(*StorageEngine::instance()));
return (*output)->init(context);
}
- output->reset(new BetaRowsetWriter);
+ output->reset(new BetaRowsetWriter(*StorageEngine::instance()));
return (*output)->init(context);
}
return Status::Error<ROWSET_TYPE_NOT_FOUND>("invalid rowset_type");
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 542528b1acb..d7ec494f0d6 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -146,10 +146,6 @@ public:
virtual int32_t allocate_segment_id() = 0;
- virtual bool is_doing_segcompaction() const = 0;
-
- virtual Status wait_flying_segcompaction() = 0;
-
virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not
supported!"; }
virtual int64_t delete_bitmap_ns() { return 0; }
diff --git a/be/src/olap/rowset/segcompaction.h
b/be/src/olap/rowset/segcompaction.h
index a0f81e59c77..e2b5812ad8c 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -16,7 +16,6 @@
// under the License.
#pragma once
-#include <stdint.h>
#include <memory>
#include <vector>
@@ -48,7 +47,7 @@ class SegcompactionWorker {
friend class BetaRowsetWriter;
public:
- SegcompactionWorker(BetaRowsetWriter* writer);
+ explicit SegcompactionWorker(BetaRowsetWriter* writer);
void compact_segments(SegCompactionCandidatesSharedPtr segments);
@@ -75,6 +74,7 @@ private:
private:
//TODO(zhengyu): current impl depends heavily on the access to feilds of
BetaRowsetWriter
+ // Currently cloud storage engine doesn't need segcompaction
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;
std::atomic<bool> _cancelled = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index ac70ef8c1f4..2b8375da598 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -26,7 +26,7 @@
#include <unordered_map>
#include <utility>
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h" // LOG
@@ -44,8 +44,10 @@
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/segment_loader.h"
#include "olap/short_key_index.h"
+#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/point_query_executor.h"
#include "util/coding.h"
@@ -336,11 +338,12 @@ void
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const
vectorized::Block* block,
size_t row_pos, size_t
num_rows) {
- if (config::cloud_mode) {
- // TODO(plat1ko)
+ if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ // TODO(plat1ko): cloud mode
return Status::NotSupported("append_block_with_partial_content");
}
- auto tablet = static_cast<Tablet*>(_tablet.get());
+
+ auto* tablet = static_cast<Tablet*>(_tablet.get());
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InternalError(
@@ -559,7 +562,8 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
const std::vector<bool>&
use_default_or_null_flag,
bool has_default_or_nullable,
const size_t& segment_start_pos) {
- if (config::cloud_mode) [[unlikely]] {
+ if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ // TODO(plat1ko): cloud mode
return Status::NotSupported("fill_missing_columns");
}
auto tablet = static_cast<Tablet*>(_tablet.get());
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 91f890b7f96..842eecf3e10 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -26,7 +26,6 @@
#include <unordered_map>
#include <utility>
-#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h" // LOG
@@ -45,6 +44,7 @@
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/point_query_executor.h"
#include "util/coding.h"
@@ -283,10 +283,11 @@ void
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
// 2.3 fill block
// 3. set columns to data convertor and then write all columns
Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock&
data) {
- if (config::cloud_mode) {
- // TODO(plat1ko)
+ if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ // TODO(plat1ko): CloudStorageEngine
return Status::NotSupported("append_block_with_partial_content");
}
+
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
@@ -495,7 +496,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
const size_t& segment_start_pos) {
- if (config::cloud_mode) [[unlikely]] {
+ if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ // TODO(plat1ko): CloudStorageEngine
return Status::NotSupported("fill_missing_columns");
}
auto tablet = static_cast<Tablet*>(_tablet.get());
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index cd9fee5fb4e..35f878dd17c 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -41,6 +41,9 @@
namespace doris {
using namespace ErrorCode;
+VerticalBetaRowsetWriter::VerticalBetaRowsetWriter(StorageEngine& engine)
+ : BetaRowsetWriter(engine) {}
+
VerticalBetaRowsetWriter::~VerticalBetaRowsetWriter() {
if (!_already_built) {
const auto& fs = _rowset_meta->fs();
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h
b/be/src/olap/rowset/vertical_beta_rowset_writer.h
index 8251ad0a07e..a1477dc2a71 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.h
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h
@@ -33,10 +33,11 @@ class Block;
} // namespace vectorized
// for vertical compaction
-class VerticalBetaRowsetWriter : public BetaRowsetWriter {
+// TODO(plat1ko): Inherited from template type `T`, `T` is `BetaRowsetWriter`
or `CloudBetaRowsetWriter`
+class VerticalBetaRowsetWriter final : public BetaRowsetWriter {
public:
- VerticalBetaRowsetWriter() : BetaRowsetWriter() {}
- ~VerticalBetaRowsetWriter();
+ VerticalBetaRowsetWriter(StorageEngine& engine);
+ ~VerticalBetaRowsetWriter() override;
Status add_columns(const vectorized::Block* block, const
std::vector<uint32_t>& col_ids,
bool is_key, uint32_t max_rows_per_segment) override;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 21fbed78022..4675d668f41 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -26,7 +26,7 @@
#include <string>
#include <utility>
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/status.h"
@@ -60,12 +60,16 @@
namespace doris {
using namespace ErrorCode;
-RowsetBuilder::RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile)
+BaseRowsetBuilder::BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile*
profile)
: _req(req), _tablet_schema(std::make_shared<TabletSchema>()) {
_init_profile(profile);
}
-void RowsetBuilder::_init_profile(RuntimeProfile* profile) {
+RowsetBuilder::RowsetBuilder(StorageEngine& engine, const WriteRequest& req,
+ RuntimeProfile* profile)
+ : BaseRowsetBuilder(req, profile), _engine(engine) {}
+
+void BaseRowsetBuilder::_init_profile(RuntimeProfile* profile) {
_profile = profile->create_child(fmt::format("RowsetBuilder {}",
_req.tablet_id), true, true);
_build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime");
_submit_delete_bitmap_timer = ADD_TIMER(_profile,
"DeleteBitmapSubmitTime");
@@ -73,11 +77,7 @@ void RowsetBuilder::_init_profile(RuntimeProfile* profile) {
_commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
}
-RowsetBuilder::~RowsetBuilder() {
- if (_is_init && !_is_committed) {
- _garbage_collection();
- }
-
+BaseRowsetBuilder::~BaseRowsetBuilder() {
if (!_is_init) {
return;
}
@@ -87,34 +87,39 @@ RowsetBuilder::~RowsetBuilder() {
}
}
-void RowsetBuilder::_garbage_collection() {
- if (config::cloud_mode) {
- return;
+RowsetBuilder::~RowsetBuilder() {
+ if (_is_init && !_is_committed) {
+ _garbage_collection();
}
+}
+
+Tablet* RowsetBuilder::tablet() {
+ return static_cast<Tablet*>(_tablet.get());
+}
+
+TabletSharedPtr RowsetBuilder::tablet_sptr() {
+ return std::static_pointer_cast<Tablet>(_tablet);
+}
+
+void RowsetBuilder::_garbage_collection() {
Status rollback_status;
- TxnManager* txn_mgr = StorageEngine::instance()->txn_manager();
- auto tablet = static_cast<Tablet*>(_tablet.get());
- if (tablet != nullptr) {
- rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet,
_req.txn_id);
+ TxnManager* txn_mgr = _engine.txn_manager();
+ if (tablet() != nullptr) {
+ rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet(),
_req.txn_id);
}
// has to check rollback status, because the rowset maybe committed in
this thread and
// published in another thread, then rollback will fail.
// when rollback failed should not delete rowset
if (rollback_status.ok()) {
- StorageEngine::instance()->add_unused_rowset(_rowset);
+ _engine.add_unused_rowset(_rowset);
}
}
Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>&
mow_context) {
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::NotSupported("init_mow_context");
- }
- auto tablet = static_cast<Tablet*>(_tablet.get());
- std::lock_guard<std::shared_mutex> lck(tablet->get_header_lock());
- int64_t cur_max_version = tablet->max_version_unlocked().second;
+ std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
+ int64_t cur_max_version = tablet()->max_version_unlocked().second;
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (tablet->tablet_state() == TABLET_NOTREADY) {
+ if (tablet()->tablet_state() == TABLET_NOTREADY) {
// Disable 'partial_update' when the tablet is undergoing a 'schema
changing process'
if (_req.table_schema_param->is_partial_update()) {
return Status::InternalError(
@@ -123,9 +128,9 @@ Status
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
}
_rowset_ids.clear();
} else {
- RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids));
+ RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids));
}
- _delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
+ _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
mow_context =
std::make_shared<MowContext>(cur_max_version, _req.txn_id,
_rowset_ids, _delete_bitmap);
return Status::OK();
@@ -136,41 +141,31 @@ Status RowsetBuilder::check_tablet_version_count() {
MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::OK();
}
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::OK();
- }
- auto tablet = std::static_pointer_cast<Tablet>(_tablet);
//trigger compaction
- auto st = StorageEngine::instance()->submit_compaction_task(
- tablet, CompactionType::CUMULATIVE_COMPACTION, true);
+ auto st = _engine.submit_compaction_task(tablet_sptr(),
CompactionType::CUMULATIVE_COMPACTION,
+ true);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "failed to trigger compaction, tablet_id=" <<
_tablet->tablet_id() << " : "
<< st;
}
- int version_count = tablet->version_count();
+ int version_count = tablet()->version_count();
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed
limit: {}, "
"tablet: {}",
- version_count, config::max_tablet_version_num,
tablet->tablet_id());
+ version_count, config::max_tablet_version_num,
_tablet->tablet_id());
}
return Status::OK();
}
Status RowsetBuilder::prepare_txn() {
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::OK();
- }
- auto tablet = static_cast<Tablet*>(_tablet.get());
- std::shared_lock base_migration_lock(tablet->get_migration_lock(),
std::try_to_lock);
+ std::shared_lock base_migration_lock(tablet()->get_migration_lock(),
std::try_to_lock);
if (!base_migration_lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>("try migration lock failed");
}
- std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
- return
StorageEngine::instance()->txn_manager()->prepare_txn(_req.partition_id,
*tablet,
- _req.txn_id,
_req.load_id);
+ std::lock_guard<std::mutex> push_lock(tablet()->get_push_lock());
+ return _engine.txn_manager()->prepare_txn(_req.partition_id, *tablet(),
_req.txn_id,
+ _req.load_id);
}
Status RowsetBuilder::init() {
@@ -205,18 +200,13 @@ Status RowsetBuilder::init() {
_rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
_pending_rs_guard =
StorageEngine::instance()->pending_local_rowsets().add(context.rowset_id);
- if (config::cloud_mode) {
- // TODO(plat1ko)
- } else {
- _calc_delete_bitmap_token =
-
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
- }
+ _calc_delete_bitmap_token =
_engine.calc_delete_bitmap_executor()->create_token();
_is_init = true;
return Status::OK();
}
-Status RowsetBuilder::build_rowset() {
+Status BaseRowsetBuilder::build_rowset() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init) << "rowset builder is supposed be to initialized before "
"build_rowset() being called";
@@ -231,27 +221,22 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
if (!_tablet->enable_unique_key_merge_on_write()) {
return Status::OK();
}
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::OK();
- }
- auto tablet = static_cast<Tablet*>(_tablet.get());
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_submit_delete_bitmap_timer);
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (tablet->tablet_state() == TABLET_NOTREADY) {
+ if (tablet()->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
"tablet_id: "
- << tablet->tablet_id() << " txn_id: " << _req.txn_id;
+ << tablet()->tablet_id() << " txn_id: " << _req.txn_id;
return Status::OK();
}
- auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
+ auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(
- tablet->calc_delete_bitmap_between_segments(_rowset, segments,
_delete_bitmap));
+ tablet()->calc_delete_bitmap_between_segments(_rowset,
segments, _delete_bitmap));
}
// For partial update, we need to fill in the entire row of data, during
the calculation
@@ -261,14 +246,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
return Status::OK();
}
- LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " <<
tablet->tablet_id()
+ LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " <<
tablet()->tablet_id()
<< ", txn_id: " << _req.txn_id;
- return tablet->commit_phase_update_delete_bitmap(_rowset, _rowset_ids,
_delete_bitmap, segments,
- _req.txn_id,
_calc_delete_bitmap_token.get(),
- nullptr);
+ return tablet()->commit_phase_update_delete_bitmap(_rowset, _rowset_ids,
_delete_bitmap,
+ segments, _req.txn_id,
+
_calc_delete_bitmap_token.get(), nullptr);
}
-Status RowsetBuilder::wait_calc_delete_bitmap() {
+Status BaseRowsetBuilder::wait_calc_delete_bitmap() {
if (!_tablet->enable_unique_key_merge_on_write() ||
_partial_update_info->is_partial_update) {
return Status::OK();
}
@@ -281,15 +266,10 @@ Status RowsetBuilder::wait_calc_delete_bitmap() {
}
Status RowsetBuilder::commit_txn() {
- if (config::cloud_mode) {
- // TODO(plat1ko)
- return Status::OK();
- }
- auto tablet = static_cast<Tablet*>(_tablet.get());
- if (tablet->enable_unique_key_merge_on_write() &&
+ if (tablet()->enable_unique_key_merge_on_write() &&
config::enable_merge_on_write_correctness_check && _rowset->num_rows()
!= 0 &&
- tablet->tablet_state() != TABLET_NOTREADY) {
- auto st = tablet->check_delete_bitmap_correctness(
+ tablet()->tablet_state() != TABLET_NOTREADY) {
+ auto st = tablet()->check_delete_bitmap_correctness(
_delete_bitmap, _rowset->end_version() - 1, _req.txn_id,
_rowset_ids);
if (!st.ok()) {
LOG(WARNING) << fmt::format(
@@ -300,21 +280,20 @@ Status RowsetBuilder::commit_txn() {
return st;
}
}
- auto storage_engine = StorageEngine::instance();
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
- if (_tablet->tablet_schema()->num_variant_columns() > 0) {
+ if (tablet()->tablet_schema()->num_variant_columns() > 0) {
// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
const RowsetWriterContext& rw_ctx = _rowset_writer->context();
-
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
+
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
}
// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
- Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id,
*tablet, _req.txn_id,
- _req.load_id,
_rowset,
-
std::move(_pending_rs_guard), false);
+ Status res = _engine.txn_manager()->commit_txn(_req.partition_id,
*tablet(), _req.txn_id,
+ _req.load_id, _rowset,
+
std::move(_pending_rs_guard), false);
if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
@@ -322,8 +301,8 @@ Status RowsetBuilder::commit_txn() {
return res;
}
if (_tablet->enable_unique_key_merge_on_write()) {
- storage_engine->txn_manager()->set_txn_related_delete_bitmap(
- _req.partition_id, _req.txn_id, tablet->tablet_id(),
tablet->tablet_uid(), true,
+ _engine.txn_manager()->set_txn_related_delete_bitmap(
+ _req.partition_id, _req.txn_id, tablet()->tablet_id(),
tablet()->tablet_uid(), true,
_delete_bitmap, _rowset_ids, _partial_update_info);
}
@@ -331,7 +310,7 @@ Status RowsetBuilder::commit_txn() {
return Status::OK();
}
-Status RowsetBuilder::cancel() {
+Status BaseRowsetBuilder::cancel() {
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
@@ -343,9 +322,9 @@ Status RowsetBuilder::cancel() {
return Status::OK();
}
-void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,
- const OlapTableSchemaParam*
table_schema_param,
- const TabletSchema&
ori_tablet_schema) {
+void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
+ const
OlapTableSchemaParam* table_schema_param,
+ const TabletSchema&
ori_tablet_schema) {
_tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
int i = 0;
@@ -356,7 +335,7 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t
index_id,
}
}
- if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
+ if (!indexes.empty() && !indexes[i]->columns.empty() &&
indexes[i]->columns[0]->unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(index_id,
table_schema_param->version(),
indexes[i],
ori_tablet_schema);
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index ea849f80e61..8f254074c37 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -17,8 +17,6 @@
#pragma once
-#include <stdint.h>
-
#include <atomic>
#include <memory>
#include <mutex>
@@ -53,21 +51,21 @@ class Block;
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
-class RowsetBuilder {
+class BaseRowsetBuilder {
public:
- RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile);
+ BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* profile);
- ~RowsetBuilder();
+ virtual ~BaseRowsetBuilder();
- Status init();
+ virtual Status init() = 0;
Status build_rowset();
- Status submit_calc_delete_bitmap_task();
+ virtual Status submit_calc_delete_bitmap_task() = 0;
Status wait_calc_delete_bitmap();
- Status commit_txn();
+ virtual Status commit_txn() = 0;
Status cancel();
@@ -86,21 +84,13 @@ public:
return _partial_update_info;
}
-private:
- void _garbage_collection();
-
+protected:
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
const TabletSchema& ori_tablet_schema);
void _init_profile(RuntimeProfile* profile);
- Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
-
- Status check_tablet_version_count();
-
- Status prepare_txn();
-
bool _is_init = false;
bool _is_cancelled = false;
bool _is_committed = false;
@@ -127,4 +117,33 @@ private:
RuntimeProfile::Counter* _commit_txn_timer = nullptr;
};
+// `StorageEngine` mixin for `BaseRowsetBuilder`
+class RowsetBuilder final : public BaseRowsetBuilder {
+public:
+ RowsetBuilder(StorageEngine& engine, const WriteRequest& req,
RuntimeProfile* profile);
+
+ ~RowsetBuilder() override;
+
+ Status init() override;
+
+ Status commit_txn() override;
+
+ Status submit_calc_delete_bitmap_task() override;
+
+private:
+ Status check_tablet_version_count();
+
+ Status prepare_txn();
+
+ void _garbage_collection();
+
+ Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
+
+ // Cast `BaseTablet` to `Tablet`
+ Tablet* tablet();
+ TabletSharedPtr tablet_sptr();
+
+ StorageEngine& _engine;
+};
+
} // namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 6ff06bc0189..7115dd76f13 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -42,9 +42,9 @@ ExecEnv::~ExecEnv() {
destroy();
}
+// TODO(plat1ko): template <class Engine>
Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) {
BaseTabletSPtr tablet;
- // TODO(plat1ko): config::cloud_mode
std::string err;
tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
if (tablet == nullptr) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c405953d6c1..94ba0720fb8 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -112,6 +112,12 @@ inline bool k_doris_exit = false;
// once to properly initialise service state.
class ExecEnv {
public:
+#ifdef CLOUD_MODE
+ using Engine = CloudStorageEngine; // TODO(plat1ko)
+#else
+ using Engine = StorageEngine;
+#endif
+
// Empty destructor because the compiler-generated one requires full
// declarations for classes in scoped_ptrs.
~ExecEnv();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 7c6549d692f..0dc0ac344b3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -21,6 +21,7 @@
#include <glog/logging.h>
#include "bvar/bvar.h"
+#include "olap/storage_engine.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
@@ -68,7 +69,7 @@ void LoadChannel::_init_profile() {
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
int64_t index_id = params.index_id();
- std::shared_ptr<TabletsChannel> channel;
+ std::shared_ptr<BaseTabletsChannel> channel;
{
std::lock_guard<std::mutex> l(_lock);
auto it = _tablets_channels.find(index_id);
@@ -77,8 +78,9 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
} else {
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
- channel = std::make_shared<TabletsChannel>(key, _load_id,
_is_high_priority,
- _self_profile);
+ // TODO(plat1ko): CloudTabletsChannel
+ channel =
std::make_shared<TabletsChannel>(*StorageEngine::instance(), key, _load_id,
+ _is_high_priority,
_self_profile);
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
_tablets_channels.insert({index_id, channel});
@@ -98,7 +100,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
return Status::OK();
}
-Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>&
channel,
+Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>&
channel,
bool& is_finished, const int64_t
index_id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _tablets_channels.find(index_id);
@@ -124,7 +126,7 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
COUNTER_UPDATE(_add_batch_times, 1);
int64_t index_id = request.index_id();
// 1. get tablets channel
- std::shared_ptr<TabletsChannel> channel;
+ std::shared_ptr<BaseTabletsChannel> channel;
bool is_finished = false;
Status st = _get_tablets_channel(channel, is_finished, index_id);
if (!st.ok() || is_finished) {
@@ -139,7 +141,7 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
// 3. handle eos
if (request.has_eos() && request.eos()) {
- st = _handle_eos(channel, request, response);
+ st = _handle_eos(channel.get(), request, response);
_report_profile(response);
if (!st.ok()) {
return st;
@@ -151,6 +153,25 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return st;
}
+Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
+ const PTabletWriterAddBlockRequest& request,
+ PTabletWriterAddBlockResult* response) {
+ _self_profile->add_info_string("EosHost", fmt::format("{}",
request.backend_id()));
+ bool finished = false;
+ auto index_id = request.index_id();
+
+ RETURN_IF_ERROR(channel->close(this, request, response, &finished));
+ if (finished) {
+ std::lock_guard<std::mutex> l(_lock);
+ {
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ _tablets_channels.erase(index_id);
+ }
+ _finished_channel_ids.emplace(index_id);
+ }
+ return Status::OK();
+}
+
void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
if (!_enable_profile) {
return;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index ea0fa0f2486..bdeedbd9eae 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,10 +17,6 @@
#pragma once
-#include <gen_cpp/internal_service.pb.h>
-#include <stdint.h>
-#include <time.h>
-
#include <algorithm>
#include <atomic>
#include <functional>
@@ -38,17 +34,18 @@
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/tablets_channel.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
-//#include <gen_cpp/internal_service.pb.h>
namespace doris {
class PTabletWriterOpenRequest;
+class PTabletWriterAddBlockRequest;
+class PTabletWriterAddBlockResult;
class OpenPartitionRequest;
+class BaseTabletsChannel;
// A LoadChannel manages tablets channels for all indexes
// corresponding to a certain load job
@@ -82,31 +79,11 @@ public:
RuntimeProfile::Counter* get_handle_mem_limit_timer() { return
_handle_mem_limit_timer; }
protected:
- Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel,
bool& is_finished,
- const int64_t index_id);
-
- Status _handle_eos(std::shared_ptr<TabletsChannel>& channel,
- const PTabletWriterAddBlockRequest& request,
- PTabletWriterAddBlockResult* response) {
- _self_profile->add_info_string("EosHost", fmt::format("{}",
request.backend_id()));
- bool finished = false;
- auto index_id = request.index_id();
-
- RETURN_IF_ERROR(channel->close(
- this, request.sender_id(), request.backend_id(), &finished,
request.partition_ids(),
- response->mutable_tablet_vec(),
response->mutable_tablet_errors(),
- request.slave_tablet_nodes(),
response->mutable_success_slave_tablet_node_ids(),
- request.write_single_replica()));
- if (finished) {
- std::lock_guard<std::mutex> l(_lock);
- {
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- _tablets_channels.erase(index_id);
- }
- _finished_channel_ids.emplace(index_id);
- }
- return Status::OK();
- }
+ Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel,
bool& is_finished,
+ int64_t index_id);
+
+ Status _handle_eos(BaseTabletsChannel* channel, const
PTabletWriterAddBlockRequest& request,
+ PTabletWriterAddBlockResult* response);
void _init_profile();
// thread safety
@@ -129,7 +106,7 @@ private:
// lock protect the tablets channel map
std::mutex _lock;
// index id -> tablets channel
- std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>>
_tablets_channels;
+ std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>>
_tablets_channels;
SpinLock _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
std::unordered_set<int64_t> _finished_channel_ids;
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index cc06f2a2f7e..a4d359dc0ea 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -17,9 +17,8 @@
#pragma once
+#include <bthread/mutex.h>
#include <gen_cpp/internal_service.pb.h>
-#include <runtime/load_stream_writer.h>
-#include <stdint.h>
#include <condition_variable>
#include <memory>
@@ -31,10 +30,15 @@
#include "butil/iobuf.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
+#include "runtime/load_stream_writer.h"
#include "util/runtime_profile.h"
namespace doris {
+class LoadStreamMgr;
+class ThreadPoolToken;
+class OlapTableSchemaParam;
+
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
class TabletStream {
@@ -47,7 +51,7 @@ public:
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
Status close();
- int64_t id() { return _id; }
+ int64_t id() const { return _id; }
friend std::ostream& operator<<(std::ostream& ostr, const TabletStream&
tablet_stream);
@@ -103,7 +107,7 @@ using StreamId = brpc::StreamId;
class LoadStream : public brpc::StreamInputHandler {
public:
LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool
enable_profile);
- ~LoadStream();
+ ~LoadStream() override;
Status init(const POpenLoadStreamRequest* request);
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 1228061bb40..466a23c8c5c 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -17,10 +17,6 @@
#pragma once
-#include <gen_cpp/internal_service.pb.h>
-#include <runtime/load_stream.h>
-#include <stdint.h>
-
#include <condition_variable>
#include <memory>
#include <mutex>
@@ -29,9 +25,13 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
+#include "runtime/load_stream.h"
+#include "util/threadpool.h"
namespace doris {
+class POpenStreamSinkRequest;
+
class LoadStreamMgr {
public:
LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool*
heavy_work_pool,
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 52948429e32..427ace47d00 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -48,6 +48,7 @@
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset_builder.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
@@ -67,14 +68,17 @@
namespace doris {
using namespace ErrorCode;
-LoadStreamWriter::LoadStreamWriter(WriteRequest* req, RuntimeProfile* profile)
- : _req(*req), _rowset_builder(*req, profile), _rowset_writer(nullptr)
{}
+LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile*
profile)
+ : _req(*context), _rowset_writer(nullptr) {
+ _rowset_builder =
+ std::make_unique<RowsetBuilder>(*StorageEngine::instance(),
*context, profile);
+}
LoadStreamWriter::~LoadStreamWriter() = default;
Status LoadStreamWriter::init() {
- RETURN_IF_ERROR(_rowset_builder.init());
- _rowset_writer = _rowset_builder.rowset_writer();
+ RETURN_IF_ERROR(_rowset_builder->init());
+ _rowset_writer = _rowset_builder->rowset_writer();
_is_init = true;
return Status::OK();
}
@@ -157,10 +161,10 @@ Status LoadStreamWriter::close() {
}
}
- RETURN_IF_ERROR(_rowset_builder.build_rowset());
- RETURN_IF_ERROR(_rowset_builder.submit_calc_delete_bitmap_task());
- RETURN_IF_ERROR(_rowset_builder.wait_calc_delete_bitmap());
- RETURN_IF_ERROR(_rowset_builder.commit_txn());
+ RETURN_IF_ERROR(_rowset_builder->build_rowset());
+ RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
+ RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
+ RETURN_IF_ERROR(_rowset_builder->commit_txn());
return Status::OK();
}
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index 37514377a3d..e038ceeb89b 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -17,11 +17,6 @@
#pragma once
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
-#include <gen_cpp/types.pb.h>
-#include <stdint.h>
-
#include <atomic>
#include <memory>
#include <mutex>
@@ -32,16 +27,12 @@
#include "brpc/stream.h"
#include "butil/iobuf.h"
#include "common/status.h"
+#include "io/fs/file_reader_writer_fwd.h"
#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
-#include "olap/rowset/beta_rowset_writer.h"
-#include "olap/rowset/rowset.h"
-#include "olap/rowset/rowset_writer.h"
-#include "olap/rowset_builder.h"
-#include "olap/tablet.h"
-#include "olap/tablet_meta.h"
-#include "olap/tablet_schema.h"
+#include "olap/rowset/rowset_fwd.h"
+#include "olap/tablet_fwd.h"
#include "util/spinlock.h"
#include "util/uid_util.h"
@@ -53,6 +44,9 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class RowsetWriter;
class RuntimeProfile;
+struct SegmentStatistics;
+using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>;
+class BaseRowsetBuilder;
namespace vectorized {
class Block;
@@ -76,13 +70,13 @@ public:
// wait for all memtables to be flushed.
Status close();
- int64_t tablet_id() { return _req.tablet_id; }
+ int64_t tablet_id() const { return _req.tablet_id; }
private:
bool _is_init = false;
bool _is_canceled = false;
WriteRequest _req;
- RowsetBuilder _rowset_builder;
+ std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
std::mutex _lock;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 5508bc3005a..d0d742e9152 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -50,10 +50,10 @@ class SlotDescriptor;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
-std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
+std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId&
load_id,
- bool is_high_priority, RuntimeProfile* profile)
+BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const
UniqueId& load_id,
+ bool is_high_priority, RuntimeProfile*
profile)
: _key(key),
_state(kInitialized),
_load_id(load_id),
@@ -66,21 +66,41 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey&
key, const UniqueId& loa
});
}
-TabletsChannel::~TabletsChannel() {
+TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey&
key,
+ const UniqueId& load_id, bool is_high_priority,
+ RuntimeProfile* profile)
+ : BaseTabletsChannel(key, load_id, is_high_priority, profile),
_engine(engine) {}
+
+BaseTabletsChannel::~BaseTabletsChannel() {
_s_tablet_writer_count -= _tablet_writers.size();
- for (auto& it : _tablet_writers) {
- delete it.second;
+}
+
+TabletsChannel::~TabletsChannel() = default;
+
+Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
+ const
PTabletWriterAddBlockRequest& request) {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_state != kOpened) {
+ return _state == kFinished ? _close_status
+ : Status::InternalError("TabletsChannel {}
state: {}",
+ _key.to_string(),
_state);
}
- delete _schema;
+ cur_seq = _next_seqs[request.sender_id()];
+ // check packet
+ if (request.packet_seq() > cur_seq) {
+ LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
+ << ", recept_seq=" << request.packet_seq();
+ return Status::InternalError("lost data packet");
+ }
+ return Status::OK();
}
-void TabletsChannel::_init_profile(RuntimeProfile* profile) {
+void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
_profile =
profile->create_child(fmt::format("TabletsChannel {}",
_key.to_string()), true, true);
_add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded",
TUnit::UNIT);
auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
- _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
_add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
_write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
_incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
@@ -95,7 +115,12 @@ void TabletsChannel::_init_profile(RuntimeProfile* profile)
{
memory_usage->AddHighWaterMarkCounter("MaxTabletFlush",
TUnit::BYTES);
}
-Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
+void TabletsChannel::_init_profile(RuntimeProfile* profile) {
+ BaseTabletsChannel::_init_profile(profile);
+ _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
+}
+
+Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kOpened) {
// Normal case, already open by other sender
@@ -105,7 +130,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest&
request) {
<< ", timeout(s): " << request.load_channel_timeout_s();
_txn_id = request.txn_id();
_index_id = request.index_id();
- _schema = new OlapTableSchemaParam();
+ _schema = std::make_unique<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();
@@ -119,7 +144,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest&
request) {
return Status::OK();
}
-Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest&
params) {
+Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest&
params) {
SCOPED_TIMER(_incremental_open_timer);
if (_state == kInitialized) { // haven't opened
return open(params);
@@ -159,22 +184,15 @@ Status TabletsChannel::incremental_open(const
PTabletWriterOpenRequest& params)
wrequest.tuple_desc = _tuple_desc;
wrequest.slots = index_slots;
wrequest.is_high_priority = _is_high_priority;
- wrequest.table_schema_param = _schema;
+ wrequest.table_schema_param = _schema.get();
- DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id);
- if (!st.ok()) {
- auto err_msg = fmt::format(
- "open delta writer failed, tablet_id={}"
- ", txn_id={}, partition_id={}, err={}",
- tablet.tablet_id(), _txn_id, tablet.partition_id(),
st.to_string());
- LOG(WARNING) << err_msg;
- return Status::InternalError(err_msg);
- }
+ // TODO(plat1ko): CloudDeltaWriter
+ auto delta_writer =
std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest,
+ _profile, _load_id);
ss << "[" << tablet.tablet_id() << "]";
{
std::lock_guard<SpinLock> l(_tablet_writers_lock);
- _tablet_writers.emplace(tablet.tablet_id(), writer);
+ _tablet_writers.emplace(tablet.tablet_id(),
std::move(delta_writer));
}
}
@@ -185,14 +203,12 @@ Status TabletsChannel::incremental_open(const
PTabletWriterOpenRequest& params)
return Status::OK();
}
-Status TabletsChannel::close(
- LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished,
- const google::protobuf::RepeatedField<int64_t>& partition_ids,
- google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
- const google::protobuf::Map<int64_t, PSlaveTabletNodes>&
slave_tablet_nodes,
- google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids,
- const bool write_single_replica) {
+Status TabletsChannel::close(LoadChannel* parent, const
PTabletWriterAddBlockRequest& req,
+ PTabletWriterAddBlockResult* res, bool* finished)
{
+ int sender_id = req.sender_id();
+ int64_t backend_id = req.backend_id();
+ const auto& partition_ids = req.partition_ids();
+ auto* tablet_errors = res->mutable_tablet_errors();
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
@@ -215,17 +231,17 @@ Status TabletsChannel::close(
// All senders are closed
// 1. close all delta writers
std::set<DeltaWriter*> need_wait_writers;
- for (auto& it : _tablet_writers) {
- if (_partition_ids.count(it.second->partition_id()) > 0) {
- auto st = it.second->close();
+ for (auto&& [tablet_id, writer] : _tablet_writers) {
+ if (_partition_ids.contains(writer->partition_id())) {
+ auto st = writer->close();
if (!st.ok()) {
auto err_msg = fmt::format(
"close tablet writer failed, tablet_id={}, "
"transaction_id={}, err={}",
- it.first, _txn_id, st.to_string());
+ tablet_id, _txn_id, st.to_string());
LOG(WARNING) << err_msg;
PTabletError* tablet_error = tablet_errors->Add();
- tablet_error->set_tablet_id(it.first);
+ tablet_error->set_tablet_id(tablet_id);
tablet_error->set_msg(st.to_string());
// just skip this tablet(writer) and continue to close
others
continue;
@@ -233,30 +249,30 @@ Status TabletsChannel::close(
// tablet writer in `_broken_tablets` should not call
`build_rowset` and
// `commit_txn` method, after that, the publish-version task
will success,
// which can cause the replica inconsistency.
- if (_is_broken_tablet(it.second->tablet_id())) {
+ if (_is_broken_tablet(writer->tablet_id())) {
LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is
broken but not cancelled"
- << ", tablet_id=" << it.first << ",
transaction_id=" << _txn_id;
+ << ", tablet_id=" << tablet_id << ",
transaction_id=" << _txn_id;
continue;
}
- need_wait_writers.insert(it.second);
+
need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
} else {
- auto st = it.second->cancel();
+ auto st = writer->cancel();
if (!st.ok()) {
- LOG(WARNING) << "cancel tablet writer failed, tablet_id="
<< it.first
+ LOG(WARNING) << "cancel tablet writer failed, tablet_id="
<< tablet_id
<< ", transaction_id=" << _txn_id;
// just skip this tablet(writer) and continue to close
others
continue;
}
- VLOG_PROGRESS << "cancel tablet writer successfully,
tablet_id=" << it.first
+ VLOG_PROGRESS << "cancel tablet writer successfully,
tablet_id=" << tablet_id
<< ", transaction_id=" << _txn_id;
}
}
- _write_single_replica = write_single_replica;
+ _write_single_replica = req.write_single_replica();
// 2. wait all writer finished flush.
- for (auto writer : need_wait_writers) {
- static_cast<void>(writer->wait_flush());
+ for (auto* writer : need_wait_writers) {
+ RETURN_IF_ERROR((writer->wait_flush()));
}
// 3. build rowset
@@ -289,23 +305,23 @@ Status TabletsChannel::close(
}
// 5. commit all writers
- for (auto writer : need_wait_writers) {
+
+ for (auto* writer : need_wait_writers) {
PSlaveTabletNodes slave_nodes;
- if (write_single_replica) {
- slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
- }
+
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE
judge it.
- _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes,
write_single_replica);
+ _commit_txn(writer, req, res);
}
- if (write_single_replica) {
+ if (_write_single_replica) {
+ auto* success_slave_tablet_node_ids =
res->mutable_success_slave_tablet_node_ids();
// The operation waiting for all slave replicas to complete must
end before the timeout,
// so that there is enough time to collect completed replica.
Otherwise, the task may
// timeout and fail even though most of the replicas are
completed. Here we set 0.9
// times the timeout as the maximum waiting time.
SCOPED_TIMER(_slave_replica_timer);
- while (need_wait_writers.size() > 0 &&
+ while (!need_wait_writers.empty() &&
(time(nullptr) - parent->last_updated_time()) <
(parent->timeout() * 0.9)) {
std::set<DeltaWriter*>::iterator it;
for (it = need_wait_writers.begin(); it !=
need_wait_writers.end();) {
@@ -318,22 +334,22 @@ Status TabletsChannel::close(
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
- for (auto writer : need_wait_writers) {
+ for (auto* writer : need_wait_writers) {
writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
}
-
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
+ _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
}
}
return Status::OK();
}
-void TabletsChannel::_commit_txn(DeltaWriter* writer,
-
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
- PSlaveTabletNodes slave_tablet_nodes,
- const bool write_single_replica) {
- Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica);
- if (st.ok()) {
+void TabletsChannel::_commit_txn(DeltaWriter* writer, const
PTabletWriterAddBlockRequest& req,
+ PTabletWriterAddBlockResult* res) {
+ Status st = writer->commit_txn(_write_single_replica
+ ?
req.slave_tablet_nodes().at(writer->tablet_id())
+ : PSlaveTabletNodes {});
+ if (st.ok()) [[likely]] {
+ auto* tablet_vec = res->mutable_tablet_vec();
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
// unused required field.
@@ -341,11 +357,11 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer,
tablet_info->set_received_rows(writer->total_received_rows());
tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
} else {
- _add_error_tablet(tablet_errors, writer->tablet_id(), st);
+ _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(),
st);
}
}
-void TabletsChannel::_add_error_tablet(
+void BaseTabletsChannel::_add_error_tablet(
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
int64_t tablet_id,
Status error) const {
PTabletError* tablet_error = tablet_errors->Add();
@@ -355,7 +371,7 @@ void TabletsChannel::_add_error_tablet(
<< "err msg " << error;
}
-void TabletsChannel::refresh_profile() {
+void BaseTabletsChannel::refresh_profile() {
int64_t write_mem_usage = 0;
int64_t flush_mem_usage = 0;
int64_t max_tablet_mem_usage = 0;
@@ -363,10 +379,10 @@ void TabletsChannel::refresh_profile() {
int64_t max_tablet_flush_mem_usage = 0;
{
std::lock_guard<SpinLock> l(_tablet_writers_lock);
- for (auto& it : _tablet_writers) {
- int64_t write_mem = it.second->mem_consumption(MemType::WRITE);
+ for (auto&& [tablet_id, writer] : _tablet_writers) {
+ int64_t write_mem = writer->mem_consumption(MemType::WRITE);
write_mem_usage += write_mem;
- int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH);
+ int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
flush_mem_usage += flush_mem;
if (write_mem > max_tablet_write_mem_usage) {
max_tablet_write_mem_usage = write_mem;
@@ -387,7 +403,7 @@ void TabletsChannel::refresh_profile() {
COUNTER_SET(_max_tablet_flush_memory_usage_counter,
max_tablet_flush_mem_usage);
}
-Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest&
request) {
+Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest&
request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (auto& index : _schema->indexes()) {
@@ -428,24 +444,17 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& request
.load_id = request.id(),
.tuple_desc = _tuple_desc,
.slots = index_slots,
- .table_schema_param = _schema,
+ .table_schema_param = _schema.get(),
.is_high_priority = _is_high_priority,
.write_file_cache = request.write_file_cache(),
};
- DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id);
- if (!st.ok()) {
- auto err_msg = fmt::format(
- "open delta writer failed, tablet_id={}"
- ", txn_id={}, partition_id={}, err={}",
- tablet.tablet_id(), _txn_id, tablet.partition_id(),
st.to_string());
- LOG(WARNING) << err_msg;
- return Status::InternalError(err_msg);
- }
+ // TODO(plat1ko): CloudDeltaWriter
+ auto writer =
std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, _profile,
+ _load_id);
{
std::lock_guard<SpinLock> l(_tablet_writers_lock);
- _tablet_writers.emplace(tablet.tablet_id(), writer);
+ _tablet_writers.emplace(tablet.tablet_id(), std::move(writer));
}
}
_s_tablet_writer_count += _tablet_writers.size();
@@ -453,7 +462,7 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& request
return Status::OK();
}
-Status TabletsChannel::cancel() {
+Status BaseTabletsChannel::cancel() {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
@@ -462,8 +471,14 @@ Status TabletsChannel::cancel() {
static_cast<void>(it.second->cancel());
}
_state = kFinished;
+
+ return Status::OK();
+}
+
+Status TabletsChannel::cancel() {
+ RETURN_IF_ERROR(BaseTabletsChannel::cancel());
if (_write_single_replica) {
-
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
+ _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
}
return Status::OK();
}
@@ -479,8 +494,8 @@ std::ostream& operator<<(std::ostream& os, const
TabletsChannelKey& key) {
return os;
}
-Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
- PTabletWriterAddBlockResult* response) {
+Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest&
request,
+ PTabletWriterAddBlockResult* response) {
SCOPED_TIMER(_add_batch_timer);
int64_t cur_seq = 0;
_add_batch_number_counter->update(1);
@@ -523,14 +538,14 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
<< ", tablet_ids_size: " << request.tablet_ids_size();
auto write_tablet_data = [&](uint32_t tablet_id,
- std::function<Status(DeltaWriter * writer)>
write_func) {
+ std::function<Status(BaseDeltaWriter *
writer)> write_func) {
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
auto tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError("unknown tablet to append data,
tablet={}", tablet_id);
}
- Status st = write_func(tablet_writer_it->second);
+ Status st = write_func(tablet_writer_it->second.get());
if (!st.ok()) {
auto err_msg =
fmt::format("tablet writer write failed, tablet_id={},
txn_id={}, err={}",
@@ -549,15 +564,16 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
if (request.is_single_tablet_block()) {
SCOPED_TIMER(_write_block_timer);
- RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0),
[&](DeltaWriter* writer) {
+ RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0),
[&](BaseDeltaWriter* writer) {
return writer->append(&send_data);
}));
} else {
SCOPED_TIMER(_write_block_timer);
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
- RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first,
[&](DeltaWriter* writer) {
- return writer->write(&send_data, tablet_to_rowidxs_it.second);
- }));
+ RETURN_IF_ERROR(
+ write_tablet_data(tablet_to_rowidxs_it.first,
[&](BaseDeltaWriter* writer) {
+ return writer->write(&send_data,
tablet_to_rowidxs_it.second);
+ }));
}
}
@@ -568,12 +584,12 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return Status::OK();
}
-void TabletsChannel::_add_broken_tablet(int64_t tablet_id) {
+void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
_broken_tablets.insert(tablet_id);
}
-bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
+bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) {
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
}
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b8e3de0584b..2f9ec9d51a9 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -17,7 +17,6 @@
#pragma once
-#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <atomic>
@@ -38,16 +37,14 @@
#include "util/spinlock.h"
#include "util/uid_util.h"
-namespace google {
-namespace protobuf {
+namespace google::protobuf {
template <typename Element>
class RepeatedField;
template <typename Key, typename T>
class Map;
template <typename T>
class RepeatedPtrField;
-} // namespace protobuf
-} // namespace google
+} // namespace google::protobuf
namespace doris {
class PSlaveTabletNodes;
@@ -55,9 +52,12 @@ class PSuccessSlaveTabletNodeIds;
class PTabletError;
class PTabletInfo;
class PTabletWriterOpenRequest;
+class PTabletWriterAddBlockRequest;
+class PTabletWriterAddBlockResult;
class PUniqueId;
class TupleDescriptor;
class OpenPartitionRequest;
+class StorageEngine;
struct TabletsChannelKey {
UniqueId id;
@@ -65,7 +65,7 @@ struct TabletsChannelKey {
TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) : id(pid),
index_id(index_id_) {}
- ~TabletsChannelKey() noexcept {}
+ ~TabletsChannelKey() noexcept = default;
bool operator==(const TabletsChannelKey& rhs) const noexcept {
return index_id == rhs.index_id && id == rhs.id;
@@ -76,18 +76,18 @@ struct TabletsChannelKey {
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
-class DeltaWriter;
+class BaseDeltaWriter;
class MemTableWriter;
class OlapTableSchemaParam;
class LoadChannel;
// Write channel for a particular (load, index).
-class TabletsChannel {
+class BaseTabletsChannel {
public:
- TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool
is_high_priority,
- RuntimeProfile* profile);
+ BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
bool is_high_priority,
+ RuntimeProfile* profile);
- ~TabletsChannel();
+ virtual ~BaseTabletsChannel();
Status open(const PTabletWriterOpenRequest& request);
// open + open writers
@@ -101,38 +101,25 @@ public:
// If all senders are closed, close this channel, set '*finished' to true,
update 'tablet_vec'
// to include all tablets written in this channel.
// no-op when this channel has been closed or cancelled
- Status
- close(LoadChannel* parent, int sender_id, int64_t backend_id, bool*
finished,
- const google::protobuf::RepeatedField<int64_t>& partition_ids,
- google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>* tablet_error,
- const google::protobuf::Map<int64_t, PSlaveTabletNodes>&
slave_tablet_nodes,
- google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids,
- const bool write_single_replica);
+ virtual Status close(LoadChannel* parent, const
PTabletWriterAddBlockRequest& req,
+ PTabletWriterAddBlockResult* res, bool* finished) = 0;
// no-op when this channel has been closed or cancelled
- Status cancel();
+ virtual Status cancel();
void refresh_profile();
-private:
- template <typename Request>
- Status _get_current_seq(int64_t& cur_seq, const Request& request);
+protected:
+ Status _get_current_seq(int64_t& cur_seq, const
PTabletWriterAddBlockRequest& request);
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);
- // deal with DeltaWriter commit_txn(), add tablet to list for return.
- void _commit_txn(DeltaWriter* writer,
- google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>*
tablet_errors,
- PSlaveTabletNodes slave_tablet_nodes, const bool
write_single_replica);
-
void _add_broken_tablet(int64_t tablet_id);
void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>*
tablet_errors,
int64_t tablet_id, Status error) const;
bool _is_broken_tablet(int64_t tablet_id);
- void _init_profile(RuntimeProfile* profile);
+ virtual void _init_profile(RuntimeProfile* profile);
// id of this load channel
TabletsChannelKey _key;
@@ -154,7 +141,7 @@ private:
// initialized in open function
int64_t _txn_id = -1;
int64_t _index_id = -1;
- OlapTableSchemaParam* _schema = nullptr;
+ std::unique_ptr<OlapTableSchemaParam> _schema;
TupleDescriptor* _tuple_desc = nullptr;
@@ -167,7 +154,7 @@ private:
Status _close_status;
// tablet_id -> TabletChannel
- std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
+ std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>>
_tablet_writers;
// broken tablet ids.
// If a tablet write fails, it's id will be added to this set.
// So that following batch will not handle this tablet anymore.
@@ -183,8 +170,6 @@ private:
bool _is_high_priority = false;
- bool _write_single_replica = false;
-
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr;
@@ -193,28 +178,36 @@ private:
RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter =
nullptr;
RuntimeProfile::HighWaterMarkCounter*
_max_tablet_write_memory_usage_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter*
_max_tablet_flush_memory_usage_counter = nullptr;
- RuntimeProfile::Counter* _slave_replica_timer = nullptr;
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _write_block_timer = nullptr;
RuntimeProfile::Counter* _incremental_open_timer = nullptr;
};
-template <typename Request>
-Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request&
request) {
- std::lock_guard<std::mutex> l(_lock);
- if (_state != kOpened) {
- return _state == kFinished ? _close_status
- : Status::InternalError("TabletsChannel {}
state: {}",
- _key.to_string(),
_state);
- }
- cur_seq = _next_seqs[request.sender_id()];
- // check packet
- if (request.packet_seq() > cur_seq) {
- LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
- << ", recept_seq=" << request.packet_seq();
- return Status::InternalError("lost data packet");
- }
- return Status::OK();
-}
+class DeltaWriter;
+
+// `StorageEngine` mixin for `BaseTabletsChannel`
+class TabletsChannel final : public BaseTabletsChannel {
+public:
+ TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, const
UniqueId& load_id,
+ bool is_high_priority, RuntimeProfile* profile);
+
+ ~TabletsChannel() override;
+
+ Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
+ PTabletWriterAddBlockResult* res, bool* finished) override;
+
+ Status cancel() override;
+
+private:
+ void _init_profile(RuntimeProfile* profile) override;
+
+ // deal with DeltaWriter commit_txn(), add tablet to list for return.
+ void _commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest&
req,
+ PTabletWriterAddBlockResult* res);
+
+ StorageEngine& _engine;
+ bool _write_single_replica = false;
+ RuntimeProfile::Counter* _slave_replica_timer = nullptr;
+};
} // namespace doris
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 8602af091a0..9c15302af6c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -27,7 +27,7 @@
#include <ostream>
#include <string>
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "exprs/function_filter.h"
@@ -62,8 +62,10 @@ BlockReader::~BlockReader() {
Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
auto res = (this->*_next_block_func)(block, eof);
- if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode)
[[unlikely]] {
- static_cast<Tablet*>(_tablet.get())->report_error(res);
+ if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ if (!res.ok()) [[unlikely]] {
+ static_cast<Tablet*>(_tablet.get())->report_error(res);
+ }
}
return res;
}
@@ -230,11 +232,10 @@ Status BlockReader::init(const ReaderParams& read_params)
{
}
auto status = _init_collect_iter(read_params);
- if (!status.ok()) {
- if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode)
[[unlikely]] {
+ if (!status.ok()) [[unlikely]] {
+ if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
static_cast<Tablet*>(_tablet.get())->report_error(status);
}
-
return status;
}
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 52d35283b50..aa16c91cbd2 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -24,7 +24,6 @@
#include <boost/iterator/iterator_facade.hpp>
#include <ostream>
-#include "cloud/config.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
@@ -53,8 +52,10 @@ VerticalBlockReader::~VerticalBlockReader() {
Status VerticalBlockReader::next_block_with_aggregation(Block* block, bool*
eof) {
auto res = (this->*_next_block_func)(block, eof);
- if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode)
[[unlikely]] {
- static_cast<Tablet*>(_tablet.get())->report_error(res);
+ if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+ if (!res.ok()) [[unlikely]] {
+ static_cast<Tablet*>(_tablet.get())->report_error(res);
+ }
}
return res;
}
@@ -212,8 +213,8 @@ Status VerticalBlockReader::init(const ReaderParams&
read_params) {
RETURN_IF_ERROR(TabletReader::init(read_params));
auto status = _init_collect_iter(read_params);
- if (!status.ok()) {
- if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode)
[[unlikely]] {
+ if (!status.ok()) [[unlikely]] {
+ if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
static_cast<Tablet*>(_tablet.get())->report_error(status);
}
return status;
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 7db9e129e58..f2274a9a76a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -43,6 +43,7 @@
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset_builder.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@@ -500,19 +501,18 @@ TEST_F(TestDeltaWriter, open) {
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = true;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
// test vec delta writer
profile = std::make_unique<RuntimeProfile>("LoadChannels");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get(), TUniqueId()));
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->build_rowset();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer->commit_txn(PSlaveTabletNodes());
EXPECT_EQ(Status::OK(), res);
- SAFE_DELETE(delta_writer);
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
@@ -547,10 +547,9 @@ TEST_F(TestDeltaWriter, vec_write) {
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get(), TUniqueId()));
- ASSERT_NE(delta_writer, nullptr);
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
vectorized::Block block;
for (const auto& slot_desc : tuple_desc->slots()) {
@@ -647,7 +646,7 @@ TEST_F(TestDeltaWriter, vec_write) {
ASSERT_TRUE(res.ok());
res = delta_writer->wait_calc_delete_bitmap();
ASSERT_TRUE(res.ok());
- res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer->commit_txn(PSlaveTabletNodes());
ASSERT_TRUE(res.ok());
// publish version success
@@ -680,7 +679,6 @@ TEST_F(TestDeltaWriter, vec_write) {
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
ASSERT_TRUE(res.ok());
- delete delta_writer;
}
TEST_F(TestDeltaWriter, vec_sequence_col) {
@@ -712,10 +710,9 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get(), TUniqueId()));
- ASSERT_NE(delta_writer, nullptr);
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
vectorized::Block block;
for (const auto& slot_desc : tuple_desc->slots()) {
@@ -742,7 +739,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
ASSERT_TRUE(res.ok());
res = delta_writer->wait_calc_delete_bitmap();
ASSERT_TRUE(res.ok());
- res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer->commit_txn(PSlaveTabletNodes());
ASSERT_TRUE(res.ok());
// publish version success
@@ -798,7 +795,6 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
ASSERT_TRUE(res.ok());
- delete delta_writer;
}
TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
@@ -829,16 +825,14 @@ TEST_F(TestDeltaWriter,
vec_sequence_col_concurrent_write) {
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer1 = nullptr;
- DeltaWriter* delta_writer2 = nullptr;
std::unique_ptr<RuntimeProfile> profile1;
profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
std::unique_ptr<RuntimeProfile> profile2;
profile2 = std::make_unique<RuntimeProfile>("LoadChannels2");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer1,
profile1.get(), TUniqueId()));
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer2,
profile2.get(), TUniqueId()));
- ASSERT_NE(delta_writer1, nullptr);
- ASSERT_NE(delta_writer2, nullptr);
+ auto delta_writer1 =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile1.get(), TUniqueId {});
+ auto delta_writer2 =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile2.get(), TUniqueId {});
// write data in delta writer 1
{
@@ -867,7 +861,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
ASSERT_TRUE(res.ok());
res = delta_writer1->wait_calc_delete_bitmap();
ASSERT_TRUE(res.ok());
- res = delta_writer1->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer1->commit_txn(PSlaveTabletNodes());
ASSERT_TRUE(res.ok());
}
// write data in delta writer 2
@@ -943,12 +937,12 @@ TEST_F(TestDeltaWriter,
vec_sequence_col_concurrent_write) {
// verify that delete bitmap calculated correctly
// since the delete bitmap not published, versions are 0
- auto delete_bitmap = delta_writer2->get_delete_bitmap();
+ auto delete_bitmap =
delta_writer2->_rowset_builder->get_delete_bitmap();
ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0));
// We can't get the rowset id of rowset2 now, will check the delete
bitmap
// contains row 0 of rowset2 at L929.
- res = delta_writer2->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer2->commit_txn(PSlaveTabletNodes());
ASSERT_TRUE(res.ok());
Version version;
@@ -1043,7 +1037,5 @@ TEST_F(TestDeltaWriter,
vec_sequence_col_concurrent_write) {
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
ASSERT_TRUE(res.ok());
- delete delta_writer1;
- delete delta_writer2;
}
} // namespace doris
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index 5107aafb552..87b30ae4da3 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -196,17 +196,15 @@ TEST_F(TestEngineStorageMigrationTask,
write_and_migration) {
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
-
profile = std::make_unique<RuntimeProfile>("LoadChannels");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get()));
- EXPECT_NE(delta_writer, nullptr);
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->build_rowset();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer->commit_txn(PSlaveTabletNodes());
EXPECT_EQ(Status::OK(), res);
// publish version success
@@ -276,7 +274,6 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration)
{
res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
- delete delta_writer;
}
} // namespace doris
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index 9771fb324c6..11e791551b1 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -140,9 +140,9 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get(), TUniqueId()));
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(),
TUniqueId {});
ASSERT_NE(delta_writer, nullptr);
auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
@@ -174,10 +174,9 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
EXPECT_EQ(Status::OK(), res);
res = delta_writer->build_rowset();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ res = delta_writer->commit_txn(PSlaveTabletNodes());
EXPECT_EQ(Status::OK(), res);
res = _engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
- delete delta_writer;
}
} // namespace doris
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 124380220f8..e19ba3bd32c 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -361,9 +361,9 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
write_req.is_high_priority = false;
write_req.table_schema_param = ¶m;
- DeltaWriter* delta_writer = nullptr;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
- static_cast<void>(DeltaWriter::open(&write_req, &delta_writer,
profile.get()));
+ auto delta_writer =
+ std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
ASSERT_NE(delta_writer, nullptr);
vectorized::Block block;
@@ -397,9 +397,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
ASSERT_EQ(Status::OK(), st);
st = delta_writer->build_rowset();
ASSERT_EQ(Status::OK(), st);
- st = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+ st = delta_writer->commit_txn(PSlaveTabletNodes());
ASSERT_EQ(Status::OK(), st);
- delete delta_writer;
// publish version success
*tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id,
write_req.schema_hash);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]