This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7524c5e [Memory Engine] Add MemSubTablet, MemTablet, WriteTx,
PartialRowBatch (#3637)
7524c5e is described below
commit 7524c5ef63becb184583dae4111a19bcb0b43e22
Author: Binglin Chang <[email protected]>
AuthorDate: Sat May 30 10:33:10 2020 +0800
[Memory Engine] Add MemSubTablet, MemTablet, WriteTx, PartialRowBatch
(#3637)
---
be/src/olap/base_tablet.cpp | 13 +-
be/src/olap/base_tablet.h | 14 +-
be/src/olap/memory/CMakeLists.txt | 3 +
be/src/olap/memory/common.h | 1 +
be/src/olap/memory/mem_sub_tablet.cpp | 246 ++++++++++++++++++
be/src/olap/memory/mem_sub_tablet.h | 120 +++++++++
be/src/olap/memory/mem_tablet.cpp | 35 ++-
be/src/olap/memory/mem_tablet.h | 47 +++-
be/src/olap/memory/partial_row_batch.cpp | 274 +++++++++++++++++++++
be/src/olap/memory/partial_row_batch.h | 172 +++++++++++++
be/src/olap/memory/schema.cpp | 72 ++++++
be/src/olap/memory/schema.h | 38 ++-
.../olap/memory/{mem_tablet.cpp => write_txn.cpp} | 16 +-
be/src/olap/memory/{mem_tablet.h => write_txn.h} | 35 ++-
be/src/olap/tablet.cpp | 9 -
be/src/olap/tablet.h | 3 +-
be/src/util/time.h | 4 +
be/test/olap/CMakeLists.txt | 1 +
be/test/olap/memory/partial_row_batch_test.cpp | 111 +++++++++
be/test/olap/memory/schema_test.cpp | 22 +-
20 files changed, 1199 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 41aa93a..368f5ed 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-#include "base_tablet.h"
+#include "olap/base_tablet.h"
+#include "util/path_util.h"
+#include "olap/data_dir.h"
namespace doris {
@@ -24,6 +26,7 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta,
DataDir* data_dir) :
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
+ _gen_tablet_path();
}
BaseTablet::~BaseTablet() {
@@ -40,4 +43,12 @@ OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
return OLAP_SUCCESS;
}
+void BaseTablet::_gen_tablet_path() {
+ std::string path = _data_dir->path() + DATA_PREFIX;
+ path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->shard_id()));
+ path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->tablet_id()));
+ path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->schema_hash()));
+ _tablet_path = path;
+}
+
} /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 34020eb..f3b0c2d 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -19,12 +19,15 @@
#define DORIS_BE_SRC_OLAP_BASE_TABLET_H
#include <memory>
+
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/utils.h"
namespace doris {
+class DataDir;
+
// Base class for all tablet classes, currently only olap/Tablet and
// olap/memory/MemTablet.
// The fields and methods in this class is not final, it will change as memory
@@ -57,10 +60,13 @@ public:
inline void set_creation_time(int64_t creation_time);
inline bool equal(int64_t tablet_id, int32_t schema_hash);
- // propreties encapsulated in TabletSchema
+ // properties encapsulated in TabletSchema
inline const TabletSchema& tablet_schema() const;
protected:
+ void _gen_tablet_path();
+
+protected:
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
TabletSchema _schema;
@@ -72,7 +78,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(BaseTablet);
};
-
inline DataDir* BaseTablet::data_dir() const {
return _data_dir;
}
@@ -99,9 +104,8 @@ inline int64_t BaseTablet::table_id() const {
inline const std::string BaseTablet::full_name() const {
std::stringstream ss;
- ss << _tablet_meta->tablet_id()
- << "." << _tablet_meta->schema_hash()
- << "." << _tablet_meta->tablet_uid().to_string();
+ ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() <<
"."
+ << _tablet_meta->tablet_uid().to_string();
return ss.str();
}
diff --git a/be/src/olap/memory/CMakeLists.txt
b/be/src/olap/memory/CMakeLists.txt
index 9de9095..b552dfe 100644
--- a/be/src/olap/memory/CMakeLists.txt
+++ b/be/src/olap/memory/CMakeLists.txt
@@ -29,5 +29,8 @@ add_library(Memory STATIC
delta_index.cpp
hash_index.cpp
mem_tablet.cpp
+ mem_sub_tablet.cpp
+ partial_row_batch.cpp
schema.cpp
+ write_txn.cpp
)
diff --git a/be/src/olap/memory/common.h b/be/src/olap/memory/common.h
index ab952b9..2185dd2 100644
--- a/be/src/olap/memory/common.h
+++ b/be/src/olap/memory/common.h
@@ -26,6 +26,7 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/types.h"
+#include "util/time.h"
namespace doris {
namespace memory {
diff --git a/be/src/olap/memory/mem_sub_tablet.cpp
b/be/src/olap/memory/mem_sub_tablet.cpp
new file mode 100644
index 0000000..8bbdf5f
--- /dev/null
+++ b/be/src/olap/memory/mem_sub_tablet.cpp
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+ std::unique_ptr<MemSubTablet>* ret) {
+ std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+ tmp->_versions.reserve(64);
+ tmp->_versions.emplace_back(version, 0);
+ tmp->_columns.resize(schema.cid_size());
+ for (size_t i = 0; i < schema.num_columns(); i++) {
+ // TODO: support storage_type != c.type
+ auto& c = *schema.get(i);
+ if (!supported(c.type())) {
+ return Status::NotSupported("column type not supported");
+ }
+ tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+ }
+ tmp.swap(*ret);
+ return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+ std::lock_guard<std::mutex> lg(_lock);
+ if (version == static_cast<uint64_t>(-1)) {
+ // get latest
+ *size = _versions.back().size;
+ return Status::OK();
+ }
+ if (_versions[0].version > version) {
+ return Status::NotFound("get_size failed, version too old");
+ }
+ for (size_t i = 1; i < _versions.size(); i++) {
+ if (_versions[i].version > version) {
+ *size = _versions[i - 1].size;
+ return Status::OK();
+ }
+ }
+ *size = _versions.back().size;
+ return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+ std::unique_ptr<ColumnReader>* reader) {
+ scoped_refptr<Column> cl;
+ {
+ std::lock_guard<std::mutex> lg(_lock);
+ if (cid < _columns.size()) {
+ cl = _columns[cid];
+ }
+ }
+ if (cl.get() != nullptr) {
+ return Status::NotFound("column not found");
+ }
+ return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+ *index = _index;
+ return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+ if (_schema != nullptr) {
+ return Status::InternalError("Another write is in-progress or error
occurred");
+ }
+ _schema = *schema;
+ _row_size = latest_size();
+ _write_index = _index;
+ _writers.clear();
+ _writers.resize(_columns.size());
+ // precache key columns
+ for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+ uint32_t cid = _schema->get(i)->cid();
+ if (_writers[cid] != nullptr) {
+ RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+ }
+ }
+ _temp_hash_entries.reserve(8);
+
+ // setup stats
+ _write_start = GetMonoTimeSecondsAsDouble();
+ _num_insert = 0;
+ _num_update = 0;
+ _num_update_cell = 0;
+ return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row_batch(PartialRowBatch* batch) {
+ while (true) {
+ bool has_row = false;
+ RETURN_IF_ERROR(batch->next_row(&has_row));
+ if (!has_row) {
+ break;
+ }
+ DCHECK_GE(batch->cur_row_cell_size(), 1);
+ const ColumnSchema* dsc;
+ const void* key;
+ // get key column and find in hash index
+ // TODO: support multi-column row key
+ batch->cur_row_get_cell(0, &dsc, &key);
+ ColumnWriter* keyw = _writers[1].get();
+ // find candidate rowids, and check equality
+ uint64_t hashcode = keyw->hashcode(key, 0);
+ _temp_hash_entries.clear();
+ uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+ uint32_t rid = -1;
+ for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+ uint32_t test_rid = _temp_hash_entries[i];
+ if (keyw->equals(test_rid, key, 0)) {
+ rid = test_rid;
+ break;
+ }
+ }
+ // if rowkey not found, do insertion/append
+ if (rid == -1) {
+ rid = _row_size;
+ // add all columns
+ //DLOG(INFO) << StringPrintf"insert rid=%u", rid);
+ for (size_t i = 0; i < batch->cur_row_cell_size(); i++) {
+ const void* data;
+ RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data));
+ uint32_t cid = dsc->cid();
+ if (_writers[cid] == nullptr) {
+
RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+ }
+ RETURN_IF_ERROR(_writers[cid]->insert(rid, data));
+ }
+ _write_index->set(newslot, hashcode, rid);
+ _row_size++;
+ if (_write_index->need_rebuild()) {
+ scoped_refptr<HashIndex> new_index;
+ // TODO: trace memory usage
+ size_t new_capacity = _row_size * 2;
+ while (true) {
+ new_index = rebuild_hash_index(new_capacity);
+ if (new_index.get() != nullptr) {
+ break;
+ } else {
+ new_capacity += 1 << 16;
+ }
+ }
+ _write_index = new_index;
+ }
+ _num_insert++;
+ } else {
+ // rowkey found, do update
+ // add non-key columns
+ for (size_t i = 1; i < batch->cur_row_cell_size(); i++) {
+ const void* data;
+ RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data));
+ uint32_t cid = dsc->cid();
+ if (cid > _schema->num_key_columns()) {
+ if (_writers[cid] == nullptr) {
+
RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+ }
+ RETURN_IF_ERROR(_writers[cid]->update(rid, data));
+ }
+ }
+ _num_update++;
+ _num_update_cell += batch->cur_row_cell_size() - 1;
+ }
+ }
+ return Status::OK();
+}
+
+Status MemSubTablet::commit_write(uint64_t version) {
+ for (size_t cid = 0; cid < _writers.size(); cid++) {
+ if (_writers[cid] != nullptr) {
+ // Should not fail in normal cases, fatal error if commit failed
+ RETURN_IF_ERROR(_writers[cid]->finalize(version));
+ }
+ }
+ {
+ std::lock_guard<std::mutex> lg(_lock);
+ if (_index != _write_index) {
+ _index = _write_index;
+ }
+ for (size_t cid = 0; cid < _writers.size(); cid++) {
+ if (_writers[cid] != nullptr) {
+ // Should not fail in normal cases, fatal error if commit
failed
+ RETURN_IF_ERROR(_writers[cid]->get_new_column(&_columns[cid]));
+ }
+ }
+ _versions.emplace_back(version, _row_size);
+ }
+ _write_index.reset();
+ _writers.clear();
+ _schema = nullptr;
+ LOG(INFO) << StringPrintf("commit writetxn(insert=%zu update=%zu
update_cell=%zu) %.3lfs",
+ _num_insert, _num_update, _num_update_cell,
+ GetMonoTimeSecondsAsDouble() - _write_start);
+ return Status::OK();
+}
+
+scoped_refptr<HashIndex> MemSubTablet::rebuild_hash_index(size_t new_capacity)
{
+ double t0 = GetMonoTimeSecondsAsDouble();
+ ColumnWriter* keyw = _writers[1].get();
+ scoped_refptr<HashIndex> hi(new HashIndex(new_capacity));
+ for (size_t i = 0; i < _row_size; i++) {
+ const void* data = keyw->get(i);
+ DCHECK(data);
+ uint64_t hashcode = keyw->hashcode(data, 0);
+ if (!hi->add(hashcode, i)) {
+ double t1 = GetMonoTimeSecondsAsDouble();
+ LOG(INFO) << StringPrintf("Rebuild hash index %zu failed time:
%.3lfs, expand",
+ new_capacity, t1 - t0);
+ return scoped_refptr<HashIndex>();
+ }
+ }
+ double t1 = GetMonoTimeSecondsAsDouble();
+ LOG(INFO) << StringPrintf("Rebuild hash index %zu time: %.3lfs",
new_capacity, t1 - t0);
+ return hi;
+}
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/mem_sub_tablet.h
b/be/src/olap/memory/mem_sub_tablet.h
new file mode 100644
index 0000000..68ddc3d
--- /dev/null
+++ b/be/src/olap/memory/mem_sub_tablet.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+class HashIndex;
+class ColumnReader;
+class PartialRowBatch;
+class Column;
+class ColumnWriter;
+
+// A MemTablet can contain multiple MemSubTablets (currently only one).
+// MemSubTablet hold a HashIndex and a collection of columns.
+// It supports single-writer multi-reader concurrently.
+//
+// Example read usage:
+// std::unique_ptr<ColumnReader> reader;
+// sub_tablet->read_column(version, cid, &reader);
+// // read(scan/get) cells using reader
+//
+// Example write usage:
+// WrintTxn* wtxn;
+// MemSubTablet* sub_tablet;
+// sub_tablet->begin_write(current_schema);
+// for (size_t i = 0; i < wtxn->batch_size(); i++) {
+// auto batch = wtxn->get_batch(i);
+// PartialRowReader reader(*batch);
+// for (size_t j = 0; j < reader.size(); j++) {
+// RETURN_IF_ERROR(reader.read(j));
+// RETURN_IF_ERROR(_sub_tablet->apply_partial_row(reader));
+// }
+// }
+// sub_tablet->commit_write(version);
+class MemSubTablet {
+public:
+ // Create a new empty MemSubTablet, with specified schema and initial
version
+ static Status create(uint64_t version, const Schema& schema,
+ std::unique_ptr<MemSubTablet>* ret);
+
+ // Destructor
+ ~MemSubTablet();
+
+ // Return number of rows of the latest version, including rows marked as
delete
+ size_t latest_size() const { return _versions.back().size; }
+
+ // Return number of rows of the specified version, including rows marked
as delete
+ Status get_size(uint64_t version, size_t* size) const;
+
+ // Read a column with specified by column id(cid) and version, return a
column reader
+ Status read_column(uint64_t version, uint32_t cid,
std::unique_ptr<ColumnReader>* reader);
+
+ // Get a hash index read reference to read
+ Status get_index_to_read(scoped_refptr<HashIndex>* index);
+
+ // Start a exclusive write batch
+ // Note: caller should make sure schema is valid during write
+ Status begin_write(scoped_refptr<Schema>* schema);
+
+ // Apply a partial row to this MemSubTablet
+ Status apply_partial_row_batch(PartialRowBatch* batch);
+
+ // Finalize the whole write batch, with specified version
+ Status commit_write(uint64_t version);
+
+private:
+ MemSubTablet();
+ scoped_refptr<HashIndex> rebuild_hash_index(size_t new_capacity);
+
+ mutable std::mutex _lock;
+ scoped_refptr<HashIndex> _index;
+ struct VersionInfo {
+ VersionInfo(uint64_t version, uint64_t size) : version(version),
size(size) {}
+ uint64_t version = 0;
+ uint64_t size = 0;
+ };
+ std::vector<VersionInfo> _versions;
+ // Map from cid to column
+ std::vector<scoped_refptr<Column>> _columns;
+
+ // Temporary write state variables
+ scoped_refptr<Schema> _schema;
+ size_t _row_size = 0;
+ // If a copy-on-write is performed on HashIndex, this variable holds
+ // the new reference, otherwise it holds the same reference as _index
+ scoped_refptr<HashIndex> _write_index;
+ // Map from cid to current writers
+ std::vector<std::unique_ptr<ColumnWriter>> _writers;
+ // Temporary variable to reuse hash entry vector
+ std::vector<uint32_t> _temp_hash_entries;
+ // Write stats
+ double _write_start = 0;
+ size_t _num_insert = 0;
+ size_t _num_update = 0;
+ size_t _num_update_cell = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(MemSubTablet);
+};
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.cpp
b/be/src/olap/memory/mem_tablet.cpp
index 5c2edd8..a75e30b 100644
--- a/be/src/olap/memory/mem_tablet.cpp
+++ b/be/src/olap/memory/mem_tablet.cpp
@@ -17,13 +17,46 @@
#include "olap/memory/mem_tablet.h"
+#include "olap/memory/mem_sub_tablet.h"
+#include "olap/memory/write_txn.h"
+
namespace doris {
namespace memory {
MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
- : BaseTablet(tablet_meta, data_dir) {}
+ : BaseTablet(tablet_meta, data_dir) {
+ _mem_schema.reset(new Schema(_schema));
+}
MemTablet::~MemTablet() {}
+std::shared_ptr<MemTablet>
MemTablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
+ DataDir*
data_dir) {
+ return std::make_shared<MemTablet>(tablet_meta, data_dir);
+}
+
+Status MemTablet::init() {
+ return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
+}
+
+Status MemTablet::scan(std::unique_ptr<ScanSpec>&& spec,
std::unique_ptr<MemTabletScan>* scan) {
+ return Status::NotSupported("scan not supported");
+}
+
+Status MemTablet::create_write_txn(std::unique_ptr<WriteTxn>* wtxn) {
+ wtxn->reset(new WriteTxn(&_mem_schema));
+ return Status::OK();
+}
+
+Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) {
+ std::lock_guard<std::mutex> lg(_write_lock);
+ RETURN_IF_ERROR(_sub_tablet->begin_write(&_mem_schema));
+ for (size_t i = 0; i < wtxn->batch_size(); i++) {
+ auto batch = wtxn->get_batch(i);
+ RETURN_IF_ERROR(_sub_tablet->apply_partial_row_batch(batch));
+ }
+ return _sub_tablet->commit_write(version);
+}
+
} // namespace memory
} // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h
index 7efc945..cfc8c3c 100644
--- a/be/src/olap/memory/mem_tablet.h
+++ b/be/src/olap/memory/mem_tablet.h
@@ -18,22 +18,67 @@
#pragma once
#include "olap/base_tablet.h"
+#include "olap/memory/schema.h"
namespace doris {
namespace memory {
+class MemSubTablet;
+class ScanSpec;
+class MemTabletScan;
+class WriteTxn;
+
// Tablet class for memory-optimized storage engine.
//
// It stores all its data in-memory, and is designed for tables with
// frequent updates.
//
-// TODO: This is just a skeleton, will add implementation in the future.
+// By design, MemTablet stores all the schema versions together inside a single
+// MemTablet, while olap/Tablet generate a new Tablet after schema change. so
their
+// behaviors are not compatible, we will address this issue latter after
adding schema
+// change support, currently MemTablet does not support schema change(only
have single
+// version of schema).
+//
+// TODO: will add more functionality as project evolves.
class MemTablet : public BaseTablet {
public:
+ static std::shared_ptr<MemTablet>
create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
+ DataDir*
data_dir = nullptr);
+
MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
+
virtual ~MemTablet();
+ // Initialize
+ Status init();
+
+ // Scan the tablet, return a MemTabletScan object scan, user can specify
projections,
+ // predicates and aggregations using ScanSpec, currently only support full
scan with
+ // projection.
+ //
+ // Note: thread-safe, supports multi-reader concurrency.
+ Status scan(std::unique_ptr<ScanSpec>&& spec,
std::unique_ptr<MemTabletScan>* scan);
+
+ // Create a write transaction
+ //
+ // Note: Thread-safe, can have multiple writetxn at the same time.
+ Status create_write_txn(std::unique_ptr<WriteTxn>* wtxn);
+
+ // Apply a write transaction and commit as the specified version
+ //
+ // Note: commit is done sequentially, protected by internal write lock
+ Status commit_write_txn(WriteTxn* wtxn, uint64_t version);
+
private:
+ // memory::Schema is used internally rather than TabletSchema, so we need
an extra
+ // copy of _schema with type memory::Schema.
+ scoped_refptr<Schema> _mem_schema;
+
+ // TODO: support multiple sub-tablets in the future
+ std::unique_ptr<MemSubTablet> _sub_tablet;
+
+ std::mutex _write_lock;
+
DISALLOW_COPY_AND_ASSIGN(MemTablet);
};
diff --git a/be/src/olap/memory/partial_row_batch.cpp
b/be/src/olap/memory/partial_row_batch.cpp
new file mode 100644
index 0000000..b7d2b5e
--- /dev/null
+++ b/be/src/olap/memory/partial_row_batch.cpp
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/partial_row_batch.h"
+
+#include "util/bitmap.h"
+
+namespace doris {
+namespace memory {
+
+// Methods for PartialRowBatch
+
+PartialRowBatch::PartialRowBatch(scoped_refptr<Schema>* schema)
+ : _schema(*schema), _bit_set_size(_schema->cid_size()) {
+ _cells.reserve(_schema->num_columns());
+}
+
+PartialRowBatch::~PartialRowBatch() {}
+
+Status PartialRowBatch::load(std::vector<uint8_t>&& buffer) {
+ _buffer = std::move(buffer);
+ _pos = _buffer.data();
+ _row_size = *reinterpret_cast<const uint64_t*>(_pos);
+ _pos += sizeof(uint64_t);
+ _next_row = 0;
+ return Status::OK();
+}
+
+Status PartialRowBatch::next_row(bool* has_row) {
+ DCHECK_LE(_next_row, _row_size);
+ *has_row = false;
+ if (_next_row == _row_size) {
+ return Status::OK();
+ }
+ DCHECK_LE(_pos, _buffer.data() + _buffer.size());
+ _cells.clear();
+ uint32_t row_bsize = *reinterpret_cast<const uint32_t*>(_pos);
+ _pos += sizeof(uint32_t);
+ const uint8_t* cur = _pos;
+ size_t bit_all_size = *reinterpret_cast<const uint16_t*>(cur);
+ cur += 2;
+ DCHECK_LE(bit_all_size, 65535);
+ const uint8_t* bitvec = cur;
+ cur += bit_all_size;
+ size_t cur_nullable_idx = _bit_set_size;
+ if (BitmapTest(bitvec, 0)) {
+ _delete = true;
+ }
+ size_t cid = 1;
+ while (BitmapFindFirstSet(bitvec, cid, _bit_set_size, &cid)) {
+ const ColumnSchema* cs = _schema->get_by_cid(cid);
+ DCHECK(cs);
+ if (cs->is_nullable()) {
+ if (BitmapTest(bitvec, cur_nullable_idx)) {
+ // is null
+ _cells.emplace_back(cid, nullptr);
+ } else {
+ // not null
+ _cells.emplace_back(cid, cur);
+ }
+ cur_nullable_idx++;
+ } else {
+ _cells.emplace_back(cid, cur);
+ }
+ const uint8_t* pdata = _cells.back().data;
+ if (pdata != nullptr) {
+ size_t bsize = _schema->get_column_byte_size(cid);
+ if (bsize == 0) {
+ return Status::NotSupported("varlen column type not
supported");
+ // size_t sz = *(uint16_t*)cur;
+ // cur += (sz + 2);
+ } else {
+ cur += bsize;
+ }
+ }
+ cid++;
+ }
+ if (_pos + row_bsize != cur) {
+ return Status::InternalError("PartialRowBatch data corruption");
+ }
+ _pos = cur;
+ *has_row = true;
+ _next_row++;
+ return Status::OK();
+}
+
+Status PartialRowBatch::cur_row_get_cell(size_t idx, const ColumnSchema** cs,
+ const void** data) const {
+ if (idx >= _cells.size()) {
+ return Status::InvalidArgument("get_cell: idx exceed cells size");
+ }
+ auto& cell = _cells[idx];
+ *cs = _schema->get_by_cid(cell.cid);
+ *data = cell.data;
+ return Status::OK();
+}
+
+// Methods for PartialRowWriter
+
+PartialRowWriter::PartialRowWriter(scoped_refptr<Schema>* schema)
+ : _schema(*schema), _bit_set_size(_schema->cid_size()),
_bit_nullable_size(0) {
+ _temp_cells.resize(_schema->cid_size());
+}
+
+Status PartialRowWriter::start_batch(size_t row_capacity, size_t
byte_capacity) {
+ _row_size = 0;
+ _row_capacity = row_capacity;
+ // reserve space for _row_size
+ _buffer.resize(sizeof(uint64_t));
+ _buffer.reserve(byte_capacity);
+ return Status::OK();
+}
+
+PartialRowWriter::~PartialRowWriter() {}
+
+Status PartialRowWriter::start_row() {
+ if (_row_size >= _row_capacity) {
+ return Status::InvalidArgument("over capacity");
+ }
+ _bit_nullable_size = 0;
+ memset(&(_temp_cells[0]), 0, sizeof(CellInfo) * _temp_cells.size());
+ return Status::OK();
+}
+
+Status PartialRowWriter::end_row() {
+ if (_row_size >= _row_capacity) {
+ return Status::InvalidArgument("over capacity");
+ }
+ size_t row_byte_size = byte_size();
+ size_t new_size = _buffer.size() + row_byte_size + 4;
+ size_t old_size = _buffer.size();
+ if (new_size > _buffer.capacity()) {
+ return Status::InvalidArgument("over capacity");
+ }
+ _buffer.resize(new_size);
+ uint8_t* pos = _buffer.data() + old_size;
+ *reinterpret_cast<uint32_t*>(pos) = row_byte_size;
+ pos += sizeof(uint32_t);
+ Status st = write(&pos);
+ DCHECK_EQ(pos, _buffer.data() + new_size);
+ if (!st.ok()) {
+ _buffer.resize(old_size);
+ return st;
+ }
+ _row_size++;
+ return Status::OK();
+}
+
+Status PartialRowWriter::set(const ColumnSchema* cs, uint32_t cid, const void*
data) {
+ if (cs->is_nullable() || (data != nullptr)) {
+ if (cs->is_nullable() && !_temp_cells[cid].isnullable) {
+ _bit_nullable_size++;
+ }
+ _temp_cells[cid].isnullable = cs->is_nullable();
+ _temp_cells[cid].isset = 1;
+ _temp_cells[cid].data = reinterpret_cast<const uint8_t*>(data);
+ } else {
+ return Status::InvalidArgument("not nullable column set to null");
+ }
+ return Status::OK();
+}
+
+Status PartialRowWriter::set(const string& col, const void* data) {
+ auto cs = _schema->get_by_name(col);
+ if (cs == nullptr) {
+ return Status::NotFound("col name not found");
+ }
+ return set(cs, cs->cid(), data);
+}
+
+Status PartialRowWriter::set(uint32_t cid, const void* data) {
+ auto cs = _schema->get_by_cid(cid);
+ if (cs == nullptr) {
+ return Status::NotFound("cid not found");
+ }
+ return set(cs, cs->cid(), data);
+}
+
+Status PartialRowWriter::set_delete() {
+ // TODO: support delete
+ // _temp_cells[0].isset = 1;
+ return Status::NotSupported("delete not supported");
+}
+
+size_t PartialRowWriter::byte_size() const {
+ // TODO: support delete
+ size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8);
+ size_t data_size = 2 + bit_all_size;
+ for (size_t i = 1; i < _temp_cells.size(); i++) {
+ if (_temp_cells[i].data != nullptr) {
+ size_t bsize = _schema->get_column_byte_size(i);
+ if (bsize == 0) {
+ LOG(FATAL) << "varlen column type not supported";
+ //data_size += 2 +
reinterpret_cast<Slice*>(_temp_cells[i].data)->size();
+ } else {
+ data_size += bsize;
+ }
+ }
+ }
+ return data_size;
+}
+
+Status PartialRowWriter::write(uint8_t** ppos) {
+ size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8);
+ if (bit_all_size >= 65536) {
+ return Status::NotSupported("too many columns");
+ }
+ // using reference is more convenient
+ uint8_t*& pos = *ppos;
+ *reinterpret_cast<uint16_t*>(pos) = (uint16_t)bit_all_size;
+ pos += 2;
+ uint8_t* bitvec = pos;
+ pos += bit_all_size;
+ memset(bitvec, 0, bit_all_size);
+ if (_temp_cells[0].isset) {
+ // deleted
+ BitmapSet(bitvec, 0);
+ }
+ size_t cur_nullable_idx = _bit_set_size;
+ for (size_t i = 1; i < _temp_cells.size(); i++) {
+ if (_temp_cells[i].isset) {
+ BitmapSet(bitvec, i);
+ if (_temp_cells[i].isnullable) {
+ if (_temp_cells[i].data == nullptr) {
+ BitmapSet(bitvec, cur_nullable_idx);
+ }
+ cur_nullable_idx++;
+ }
+ const uint8_t* pdata = _temp_cells[i].data;
+ if (pdata != nullptr) {
+ size_t bsize = _schema->get_column_byte_size(i);
+ if (bsize == 0) {
+ return Status::NotSupported("varlen column type not
supported");
+ // Some incomplete code to write string(Slice), may be
useful in future
+ // size_t sz = ((Slice*)pdata)->size();
+ // *(uint16_t*)pos = (uint16_t)sz;
+ // pos += 2;
+ // memcpy(pos, ((Slice*)pdata)->data(), sz);
+ // pos += sz;
+ } else {
+ memcpy(pos, _temp_cells[i].data, bsize);
+ pos += bsize;
+ }
+ }
+ } else if (i <= _schema->num_key_columns()) {
+ return Status::InvalidArgument("build without key columns");
+ }
+ }
+ return Status::OK();
+}
+
+Status PartialRowWriter::finish_batch(vector<uint8_t>* buffer) {
+ *reinterpret_cast<uint64_t*>(_buffer.data()) = _row_size;
+ _buffer.swap(*buffer);
+ _row_size = 0;
+ return Status::OK();
+}
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/partial_row_batch.h
b/be/src/olap/memory/partial_row_batch.h
new file mode 100644
index 0000000..893f865
--- /dev/null
+++ b/be/src/olap/memory/partial_row_batch.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+// A chunk of memory that stores a batch of serialized partial rows
+// User can iterate through all the partial rows, get each partial row's cells.
+//
+// Serialization format for a batch:
+// 4 byte len | serialized partial row
+// 4 byte len | serialized partial row
+// ...
+// 4 byte len | serialized partial row
+//
+// Serialization format for a partial row
+// bit vector(se + null) byte size (2 byte) |
+// bit vector mark set cells |
+// bit vector mark nullable cells' null value |
+// 8bit padding
+// serialized not null cells
+//
+// Example usage:
+// PartialRowBatch rb(&schema);
+// rb.load(buffer);
+// while (true) {
+// bool has;
+// rb.next(&has);
+// if (!has) break;
+// for (size_t j=0; j < reader.cell_size(); j++) {
+// const ColumnSchema* cs = nullptr;
+// const void* data = nullptr;
+// // get column cell type and data
+// rb.get_cell(j, &cs, &data);
+// }
+// }
+//
+// Note: currently only fixed length column types are supported. All length
and scalar types store
+// in native byte order(little endian in x86-64).
+//
+// Note: The serialization format is simple, it only provides basic
functionalities
+// so we can quickly complete the whole create/read/write pipeline. The format
may change
+// as the project evolves.
+class PartialRowBatch {
+public:
+ explicit PartialRowBatch(scoped_refptr<Schema>* schema);
+ ~PartialRowBatch();
+
+ const Schema& schema() const { return *_schema.get(); }
+
+ // Load from a serialized buffer
+ Status load(std::vector<uint8_t>&& buffer);
+
+ // Return row count in this batch
+ size_t row_size() const { return _row_size; }
+
+ // Iterate to next row, mark has_row to false if there is no more rows
+ Status next_row(bool* has_row);
+
+ // Get row operation cell count
+ size_t cur_row_cell_size() const { return _cells.size(); }
+ // Get row operation cell by index idx, return ColumnSchema and data
pointer
+ Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void**
data) const;
+
+private:
+ scoped_refptr<Schema> _schema;
+
+ bool _delete = false;
+ size_t _bit_set_size = 0;
+ struct CellInfo {
+ CellInfo(uint32_t cid, const void* data)
+ : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {}
+ uint32_t cid = 0;
+ const uint8_t* data = nullptr;
+ };
+ vector<CellInfo> _cells;
+
+ size_t _next_row = 0;
+ size_t _row_size = 0;
+ const uint8_t* _pos = nullptr;
+ std::vector<uint8_t> _buffer;
+};
+
+// Writer for PartialRowBatch
+//
+// Example usage:
+// scoped_refptr<Schema> sc;
+// Schema::create("id int,uv int,pv int,city tinyint null", &sc);
+// PartialRowWriter writer(*sc.get());
+// writer.start_batch();
+// for (auto& row : rows) {
+// writer.start_row();
+// writer.set("column_name", value);
+// ...
+// writer.set(column_id, value);
+// writer.end_row();
+// }
+// vector<uint8_t> buffer;
+// writer.finish_batch(&buffer);
+class PartialRowWriter {
+public:
+ static const size_t DEFAULT_BYTE_CAPACITY = 1 << 20;
+ static const size_t DEFAULT_ROW_CAPACIT = 1 << 16;
+
+ explicit PartialRowWriter(scoped_refptr<Schema>* schema);
+ ~PartialRowWriter();
+
+ Status start_batch(size_t row_capacity = DEFAULT_ROW_CAPACIT,
+ size_t byte_capacity = DEFAULT_BYTE_CAPACITY);
+
+ // Start writing a new row
+ Status start_row();
+
+ // Set cell value by column name
+ // param data's memory must remain valid before calling build
+ Status set(const string& col, const void* data);
+
+ // Set cell value by column id
+ // param data's memory must remain valid before calling build
+ Status set(uint32_t cid, const void* data);
+
+ // set this row is delete operation
+ Status set_delete();
+
+ // Finish writing a row
+ Status end_row();
+
+ // Finish writing the whole ParitialRowBatch, return a serialized buffer
+ Status finish_batch(vector<uint8_t>* buffer);
+
+private:
+ Status set(const ColumnSchema* cs, uint32_t cid, const void* data);
+ size_t byte_size() const;
+ Status write(uint8_t** ppos);
+
+ scoped_refptr<Schema> _schema;
+ struct CellInfo {
+ CellInfo() = default;
+ uint32_t isset = 0;
+ uint32_t isnullable = 0;
+ const uint8_t* data = nullptr;
+ };
+ vector<CellInfo> _temp_cells;
+ size_t _bit_set_size = 0;
+ size_t _bit_nullable_size = 0;
+ size_t _row_size = 0;
+ size_t _row_capacity = 0;
+
+ std::vector<uint8_t> _buffer;
+};
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/schema.cpp b/be/src/olap/memory/schema.cpp
index cf9b3d3..d0b89c4 100644
--- a/be/src/olap/memory/schema.cpp
+++ b/be/src/olap/memory/schema.cpp
@@ -17,9 +17,50 @@
#include "olap/memory/schema.h"
+#include "gutil/strings/split.h"
+
namespace doris {
namespace memory {
+bool supported(ColumnType type) {
+ switch (type) {
+ case OLAP_FIELD_TYPE_TINYINT:
+ case OLAP_FIELD_TYPE_SMALLINT:
+ case OLAP_FIELD_TYPE_INT:
+ case OLAP_FIELD_TYPE_BIGINT:
+ case OLAP_FIELD_TYPE_LARGEINT:
+ case OLAP_FIELD_TYPE_FLOAT:
+ case OLAP_FIELD_TYPE_DOUBLE:
+ case OLAP_FIELD_TYPE_BOOL:
+ return true;
+ default:
+ return false;
+ }
+}
+
+size_t get_type_byte_size(ColumnType type) {
+ switch (type) {
+ case OLAP_FIELD_TYPE_TINYINT:
+ return 1;
+ case OLAP_FIELD_TYPE_SMALLINT:
+ return 2;
+ case OLAP_FIELD_TYPE_INT:
+ return 4;
+ case OLAP_FIELD_TYPE_BIGINT:
+ return 8;
+ case OLAP_FIELD_TYPE_LARGEINT:
+ return 16;
+ case OLAP_FIELD_TYPE_FLOAT:
+ return 4;
+ case OLAP_FIELD_TYPE_DOUBLE:
+ return 8;
+ case OLAP_FIELD_TYPE_BOOL:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
ColumnSchema::ColumnSchema(const TabletColumn& tcolumn) : _tcolumn(tcolumn) {}
ColumnSchema::ColumnSchema(uint32_t cid, const string& name, ColumnType type,
bool nullable,
@@ -44,15 +85,46 @@ std::string ColumnSchema::debug_string() const {
//////////////////////////////////////////////////////////////////////////////
+Status Schema::create(const string& desc, scoped_refptr<Schema>* sc) {
+ TabletSchemaPB tspb;
+ std::vector<std::string> cs = strings::Split(desc, ",",
strings::SkipWhitespace());
+ uint32_t cid = 1;
+ for (std::string& c : cs) {
+ ColumnPB* cpb = tspb.add_column();
+ std::vector<std::string> fs = strings::Split(c, " ",
strings::SkipWhitespace());
+ if (fs.size() < 2) {
+ return Status::InvalidArgument("bad schema desc");
+ }
+ cpb->set_is_key(cid == 1);
+ cpb->set_unique_id(cid++);
+ cpb->set_name(fs[0]);
+ cpb->set_type(fs[1]);
+ if (fs.size() == 3 && fs[2] == "null") {
+ cpb->set_is_nullable(true);
+ }
+ }
+ tspb.set_keys_type(KeysType::UNIQUE_KEYS);
+ tspb.set_next_column_unique_id(cid);
+ tspb.set_num_short_key_columns(1);
+ tspb.set_is_in_memory(false);
+ TabletSchema ts;
+ ts.init_from_pb(tspb);
+ sc->reset(new Schema(ts));
+ return Status::OK();
+}
+
Schema::Schema(const TabletSchema& tschema) : _tschema(tschema) {
_cid_size = 1;
_cid_to_col.resize(_cid_size, nullptr);
+ _column_byte_sizes.resize(_cid_size, 0);
for (size_t i = 0; i < num_columns(); i++) {
const ColumnSchema* cs = get(i);
_cid_size = std::max(_cid_size, cs->cid() + 1);
_cid_to_col.resize(_cid_size, nullptr);
_cid_to_col[cs->cid()] = cs;
_name_to_col[cs->name()] = cs;
+ _column_byte_sizes.resize(_cid_size, 0);
+ _column_byte_sizes[cs->cid()] = get_type_byte_size(cs->type());
}
}
diff --git a/be/src/olap/memory/schema.h b/be/src/olap/memory/schema.h
index ebe3571..9a41480 100644
--- a/be/src/olap/memory/schema.h
+++ b/be/src/olap/memory/schema.h
@@ -29,22 +29,36 @@ namespace memory {
// Memory engine's column type, just use FieldType for now
typedef FieldType ColumnType;
+// Return true if this ColumnType is supported
+bool supported(ColumnType type);
+
// Memory engine's column schema, simple wrapper of TabletColumn.
// TODO: Add more properties and methods later
class ColumnSchema {
public:
explicit ColumnSchema(const TabletColumn& tcolumn);
ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool
nullable, bool is_key);
+
+ // Get column id
inline uint32_t cid() const { return
static_cast<uint32_t>(_tcolumn.unique_id()); }
+
+ // Get column name
inline std::string name() const { return _tcolumn.name(); }
+
+ // Get column type
inline ColumnType type() const { return _tcolumn.type(); }
+
+ // Get is nullable
inline bool is_nullable() const { return _tcolumn.is_nullable(); }
+
+ // Get is key
inline bool is_key() const { return _tcolumn.is_key(); }
std::string type_name() const;
std::string debug_string() const;
private:
+ // Note: do not add more field into this class, it needs to be identical
to TabletColumn
TabletColumn _tcolumn;
};
@@ -55,25 +69,47 @@ private:
// 2. in the future, there may be a special compound primary key column
// if primary-key has multiple columns
// TODO: Add more properties and methods later
-class Schema {
+class Schema : public RefCountedThreadSafe<Schema> {
public:
+ // Create schema by description string, utility method for test
+ static Status create(const string& desc, scoped_refptr<Schema>* sc);
+
explicit Schema(const TabletSchema& tschema);
+
std::string debug_string() const;
+
inline size_t num_columns() const { return _tschema.num_columns(); }
+
inline size_t num_key_columns() const { return _tschema.num_key_columns();
}
+ // Get ColumnSchema by index
const ColumnSchema* get(size_t idx) const;
+ // Get ColumnSchema by name
const ColumnSchema* get_by_name(const string& name) const;
+ // Get column id space size
+ //
+ // For example:
+ // If a schema have 5 columns with id [1, 2, 3, 5, 6]
+ // It's cid_size equals max(cid)+1 = 7
uint32_t cid_size() const;
+
+ // Get ColumnSchema by column id
const ColumnSchema* get_by_cid(uint32_t cid) const;
+ // Get column type byte size by column id
+ size_t get_column_byte_size(uint32_t cid) const {
+ DCHECK_LT(cid, _column_byte_sizes.size());
+ return _column_byte_sizes[cid];
+ }
+
private:
TabletSchema _tschema;
uint32_t _cid_size;
std::unordered_map<string, const ColumnSchema*> _name_to_col;
vector<const ColumnSchema*> _cid_to_col;
+ vector<size_t> _column_byte_sizes;
};
} // namespace memory
diff --git a/be/src/olap/memory/mem_tablet.cpp
b/be/src/olap/memory/write_txn.cpp
similarity index 70%
copy from be/src/olap/memory/mem_tablet.cpp
copy to be/src/olap/memory/write_txn.cpp
index 5c2edd8..908a0c5 100644
--- a/be/src/olap/memory/mem_tablet.cpp
+++ b/be/src/olap/memory/write_txn.cpp
@@ -15,15 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-#include "olap/memory/mem_tablet.h"
+#include "olap/memory/write_txn.h"
namespace doris {
namespace memory {
-MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
- : BaseTablet(tablet_meta, data_dir) {}
+WriteTxn::WriteTxn(scoped_refptr<Schema>* schema) : _schema(schema->get()) {}
-MemTablet::~MemTablet() {}
+WriteTxn::~WriteTxn() {}
+
+PartialRowBatch* WriteTxn::new_batch() {
+ _batches.emplace_back(new PartialRowBatch(&_schema));
+ return _batches.back().get();
+}
+
+PartialRowBatch* WriteTxn::get_batch(size_t idx) const {
+ return _batches[idx].get();
+}
} // namespace memory
} // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/write_txn.h
similarity index 50%
copy from be/src/olap/memory/mem_tablet.h
copy to be/src/olap/memory/write_txn.h
index 7efc945..74cee3c 100644
--- a/be/src/olap/memory/mem_tablet.h
+++ b/be/src/olap/memory/write_txn.h
@@ -17,24 +17,41 @@
#pragma once
-#include "olap/base_tablet.h"
+#include "olap/memory/common.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
namespace doris {
namespace memory {
-// Tablet class for memory-optimized storage engine.
+class PartialRowBatch;
+
+// Class for write transaction
//
-// It stores all its data in-memory, and is designed for tables with
-// frequent updates.
+// Note: Currently it stores all its operations in memory, to make things
simple,
+// so we can quickly complete the whole create/read/write pipeline. The data
structure may
+// change as the project evolves.
//
-// TODO: This is just a skeleton, will add implementation in the future.
-class MemTablet : public BaseTablet {
+// TODO: add write to/load from WritexTx files in future.
+class WriteTxn {
public:
- MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
- virtual ~MemTablet();
+ explicit WriteTxn(scoped_refptr<Schema>* schema);
+ ~WriteTxn();
+
+ const Schema& schema() const { return *_schema.get(); }
+
+ // Get number of batches
+ size_t batch_size() const { return _batches.size(); }
+
+ // Get batch by index
+ PartialRowBatch* get_batch(size_t idx) const;
+
+ // Add a new batch to this WriteTx
+ PartialRowBatch* new_batch();
private:
- DISALLOW_COPY_AND_ASSIGN(MemTablet);
+ scoped_refptr<Schema> _schema;
+ std::vector<std::unique_ptr<PartialRowBatch>> _batches;
};
} // namespace memory
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 96f1de4..2b49580 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -53,14 +53,6 @@ TabletSharedPtr
Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
return std::make_shared<Tablet>(tablet_meta, data_dir);
}
-void Tablet::_gen_tablet_path() {
- std::string path = _data_dir->path() + DATA_PREFIX;
- path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->shard_id()));
- path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->tablet_id()));
- path = path_util::join_path_segments(path,
std::to_string(_tablet_meta->schema_hash()));
- _tablet_path = path;
-}
-
Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
BaseTablet(tablet_meta, data_dir),
_is_bad(false),
@@ -69,7 +61,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir*
data_dir) :
_last_cumu_compaction_success_millis(0),
_last_base_compaction_success_millis(0),
_cumulative_point(kInvalidCumulativePoint) {
- _gen_tablet_path();
_rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f8746f8..40cac13 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -78,7 +78,7 @@ public:
inline KeysType keys_type() const;
inline size_t num_columns() const;
inline size_t num_null_columns() const;
- inline size_t num_key_columns() const ;
+ inline size_t num_key_columns() const;
inline size_t num_short_key_columns() const;
inline size_t num_rows_per_row_block() const;
inline CompressKind compress_kind() const;
@@ -233,7 +233,6 @@ private:
OLAPStatus _contains_version(const Version& version);
void _max_continuous_version_from_begining_unlocked(Version* version,
VersionHash* v_hash)
const ;
- void _gen_tablet_path();
RowsetSharedPtr _rowset_with_largest_size();
void _delete_inc_rowset_by_version(const Version& version, const
VersionHash& version_hash);
OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>&
version_path,
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 01d56c6..5a4bfbb 100755
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -59,6 +59,10 @@ inline int64_t MonotonicSeconds() {
return GetMonoTimeMicros() / MICROS_PER_SEC;
}
+inline double GetMonoTimeSecondsAsDouble() {
+ return GetMonoTimeMicros() / static_cast<double>(MICROS_PER_SEC);
+}
+
// Returns the time since the Epoch measured in microseconds.
inline int64_t GetCurrentTimeMicros() {
timespec ts;
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 48c34ce..f00d7b7 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -86,3 +86,4 @@ ADD_BE_TEST(memory/hash_index_test)
ADD_BE_TEST(memory/column_delta_test)
ADD_BE_TEST(memory/schema_test)
ADD_BE_TEST(memory/column_test)
+ADD_BE_TEST(memory/partial_row_batch_test)
diff --git a/be/test/olap/memory/partial_row_batch_test.cpp
b/be/test/olap/memory/partial_row_batch_test.cpp
new file mode 100644
index 0000000..354795c
--- /dev/null
+++ b/be/test/olap/memory/partial_row_batch_test.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/partial_row_batch.h"
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "util/hash_util.hpp"
+
+namespace doris {
+namespace memory {
+
+TEST(PartialRowbatch, write) {
+ scoped_refptr<Schema> sc;
+ ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null",
&sc).ok());
+ PartialRowWriter writer(&sc);
+ srand(1);
+ const int N = 1000;
+ size_t nrow = 0;
+ // add insert/update operation
+ EXPECT_TRUE(writer.start_batch().ok());
+ for (int i = 0; i < N; i++) {
+ nrow++;
+ writer.start_row();
+ int id = i;
+ int uv = rand() % 10000;
+ int pv = rand() % 10000;
+ int8_t city = rand() % 100;
+ EXPECT_TRUE(writer.set("id", &id).ok());
+ if (i % 3 == 0) {
+ EXPECT_TRUE(writer.set("uv", &uv).ok());
+ EXPECT_TRUE(writer.set("pv", &pv).ok());
+ EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr :
&city).ok());
+ }
+ EXPECT_TRUE(writer.end_row().ok());
+ }
+ vector<uint8_t> buffer;
+ writer.finish_batch(&buffer);
+
+ PartialRowBatch rb(&sc);
+ EXPECT_TRUE(rb.load(std::move(buffer)).ok());
+ EXPECT_EQ(rb.row_size(), nrow);
+ // read from rowbatch and check equality
+ srand(1);
+ for (size_t i = 0; i < nrow; i++) {
+ bool has_row = false;
+ EXPECT_TRUE(rb.next_row(&has_row).ok());
+ EXPECT_TRUE(has_row);
+ if (i % 3 == 0) {
+ EXPECT_EQ(rb.cur_row_cell_size(), 4);
+ } else {
+ EXPECT_EQ(rb.cur_row_cell_size(), 1);
+ }
+ int id = i;
+ int uv = rand() % 10000;
+ int pv = rand() % 10000;
+ int8_t city = rand() % 100;
+
+ const ColumnSchema* cs = nullptr;
+ const void* data = nullptr;
+
+ EXPECT_TRUE(rb.cur_row_get_cell(0, &cs, &data).ok());
+ EXPECT_EQ(cs->cid(), 1);
+ EXPECT_EQ(*(int32_t*)data, id);
+
+ if (i % 3 == 0) {
+ EXPECT_TRUE(rb.cur_row_get_cell(1, &cs, &data).ok());
+ EXPECT_EQ(cs->cid(), 2);
+ EXPECT_EQ(*(int32_t*)data, uv);
+
+ EXPECT_TRUE(rb.cur_row_get_cell(2, &cs, &data).ok());
+ EXPECT_EQ(cs->cid(), 3);
+ EXPECT_EQ(*(int32_t*)data, pv);
+
+ EXPECT_TRUE(rb.cur_row_get_cell(3, &cs, &data).ok());
+ EXPECT_EQ(cs->cid(), 4);
+ if (city % 2 == 0) {
+ EXPECT_EQ(data, nullptr);
+ } else {
+ EXPECT_EQ(*(int8_t*)data, city);
+ }
+ }
+ }
+ bool has_row = false;
+ EXPECT_TRUE(rb.next_row(&has_row).ok());
+ EXPECT_FALSE(has_row);
+}
+
+} // namespace memory
+} // namespace doris
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/be/test/olap/memory/schema_test.cpp
b/be/test/olap/memory/schema_test.cpp
index 8772288..71d7d52 100644
--- a/be/test/olap/memory/schema_test.cpp
+++ b/be/test/olap/memory/schema_test.cpp
@@ -32,6 +32,20 @@ TEST(ColumnSchema, create) {
EXPECT_TRUE(cs.is_key());
}
+TEST(Schema, desc_create) {
+ scoped_refptr<Schema> sc;
+ ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null",
&sc).ok());
+ ASSERT_EQ(sc->num_columns(), 4);
+ ASSERT_EQ(sc->num_key_columns(), 1);
+ ASSERT_EQ(sc->get(0)->cid(), 1);
+ ASSERT_EQ(sc->get(1)->cid(), 2);
+ ASSERT_EQ(sc->get(2)->cid(), 3);
+ ASSERT_EQ(sc->get(3)->cid(), 4);
+ ASSERT_EQ(sc->get_by_name("city")->is_nullable(), true);
+ ASSERT_EQ(sc->get_by_name("pv")->is_nullable(), false);
+ ASSERT_EQ(sc->get_by_name("uv")->type(), ColumnType::OLAP_FIELD_TYPE_INT);
+}
+
TEST(Schema, create) {
TabletSchemaPB tspb;
auto cpb = tspb.add_column();
@@ -52,10 +66,10 @@ TEST(Schema, create) {
tspb.set_is_in_memory(false);
TabletSchema ts;
ts.init_from_pb(tspb);
- Schema schema(ts);
- EXPECT_EQ(schema.cid_size(), 3);
- EXPECT_EQ(schema.get_by_name("uid")->name(), std::string("uid"));
- EXPECT_EQ(schema.get_by_cid(1)->name(), std::string("uid"));
+ scoped_refptr<Schema> schema(new Schema(ts));
+ EXPECT_EQ(schema->cid_size(), 3);
+ EXPECT_EQ(schema->get_by_name("uid")->name(), std::string("uid"));
+ EXPECT_EQ(schema->get_by_cid(1)->name(), std::string("uid"));
}
} // namespace memory
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]