This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26e117d8b41 [enhance](cloud) Implement FullRangeGetIterator to
simplify iterating over a kv range (#33388)
26e117d8b41 is described below
commit 26e117d8b41858401bcd1b5c8f6e971cb6fc3ba0
Author: plat1ko <[email protected]>
AuthorDate: Tue Jun 11 14:48:02 2024 +0800
[enhance](cloud) Implement FullRangeGetIterator to simplify iterating over
a kv range (#33388)
Implement FullRangeGetIterator to simplify iterating over a kv range
## Further comments
If this is a relatively large or complex change, kick off the discussion
at [[email protected]](mailto:[email protected]) by explaining why
you chose the solution you did and what alternatives you considered,
etc...
---
cloud/src/meta-service/mem_txn_kv.cpp | 49 +++++++
cloud/src/meta-service/mem_txn_kv.h | 26 +++-
cloud/src/meta-service/txn_kv.cpp | 131 +++++++++++++++++
cloud/src/meta-service/txn_kv.h | 87 +++++++++++
cloud/test/CMakeLists.txt | 1 +
cloud/test/txn_kv_test.cpp | 263 ++++++++++++++++++++++++++++++++++
6 files changed, 556 insertions(+), 1 deletion(-)
diff --git a/cloud/src/meta-service/mem_txn_kv.cpp
b/cloud/src/meta-service/mem_txn_kv.cpp
index 76066192252..36453251beb 100644
--- a/cloud/src/meta-service/mem_txn_kv.cpp
+++ b/cloud/src/meta-service/mem_txn_kv.cpp
@@ -218,6 +218,12 @@ int64_t MemTxnKv::get_last_read_version() {
return read_version_;
}
+std::unique_ptr<FullRangeGetIterator> MemTxnKv::full_range_get(std::string
begin, std::string end,
+
FullRangeGetIteratorOptions opts) {
+ return std::make_unique<memkv::FullRangeGetIterator>(std::move(begin),
std::move(end),
+ std::move(opts));
+}
+
} // namespace doris::cloud
namespace doris::cloud::memkv {
@@ -477,4 +483,47 @@ TxnErrorCode
Transaction::batch_get(std::vector<std::optional<std::string>>* res
return TxnErrorCode::TXN_OK;
}
+FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
+ FullRangeGetIteratorOptions opts)
+ : opts_(std::move(opts)), begin_(std::move(begin)),
end_(std::move(end)) {}
+
+FullRangeGetIterator::~FullRangeGetIterator() = default;
+
+bool FullRangeGetIterator::has_next() {
+ if (!is_valid_) {
+ return false;
+ }
+
+ if (!inner_iter_) {
+ auto* txn = opts_.txn;
+ if (!txn) {
+ // Create a new txn for each inner range get
+ std::unique_ptr<cloud::Transaction> txn1;
+ TxnErrorCode err = opts_.txn_kv->create_txn(&txn_);
+ if (err != TxnErrorCode::TXN_OK) {
+ is_valid_ = false;
+ return false;
+ }
+
+ txn = txn_.get();
+ }
+
+ TxnErrorCode err = txn->get(begin_, end_, &inner_iter_,
opts_.snapshot, 0);
+ if (err != TxnErrorCode::TXN_OK) {
+ is_valid_ = false;
+ return false;
+ }
+ }
+
+ return inner_iter_->has_next();
+}
+
+std::optional<std::pair<std::string_view, std::string_view>>
FullRangeGetIterator::next() {
+ if (!has_next()) {
+ return std::nullopt;
+ }
+
+ return inner_iter_->next();
+}
+
} // namespace doris::cloud::memkv
diff --git a/cloud/src/meta-service/mem_txn_kv.h
b/cloud/src/meta-service/mem_txn_kv.h
index 7da70aad8f3..63fb008f586 100644
--- a/cloud/src/meta-service/mem_txn_kv.h
+++ b/cloud/src/meta-service/mem_txn_kv.h
@@ -50,6 +50,9 @@ public:
int init() override;
+ std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin,
std::string end,
+
FullRangeGetIteratorOptions opts) override;
+
TxnErrorCode get_kv(const std::string& key, std::string* val, int64_t
version);
TxnErrorCode get_kv(const std::string& begin, const std::string& end,
int64_t version,
int limit, bool* more, std::map<std::string,
std::string>* kv_list);
@@ -208,7 +211,7 @@ private:
std::set<std::string> unreadable_keys_;
std::set<std::string> read_set_;
std::map<std::string, std::string> writes_;
- std::list<std::pair<std::string, std::string>> remove_ranges_;
+ std::vector<std::pair<std::string, std::string>> remove_ranges_;
std::vector<std::tuple<ModifyOpType, std::string, std::string>> op_list_;
int64_t committed_version_ = -1;
@@ -260,5 +263,26 @@ private:
bool more_;
};
+class FullRangeGetIterator final : public cloud::FullRangeGetIterator {
+public:
+ FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetIteratorOptions opts);
+
+ ~FullRangeGetIterator() override;
+
+ bool is_valid() override { return is_valid_; }
+
+ bool has_next() override;
+
+ std::optional<std::pair<std::string_view, std::string_view>> next()
override;
+
+private:
+ FullRangeGetIteratorOptions opts_;
+ bool is_valid_ {true};
+ std::unique_ptr<cloud::RangeGetIterator> inner_iter_;
+ std::string begin_;
+ std::string end_;
+ std::unique_ptr<cloud::Transaction> txn_;
+};
+
} // namespace memkv
} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/txn_kv.cpp
b/cloud/src/meta-service/txn_kv.cpp
index ebb63d095f8..f48ac8f9912 100644
--- a/cloud/src/meta-service/txn_kv.cpp
+++ b/cloud/src/meta-service/txn_kv.cpp
@@ -78,6 +78,12 @@ TxnErrorCode
FdbTxnKv::create_txn(std::unique_ptr<Transaction>* txn) {
return ret;
}
+std::unique_ptr<FullRangeGetIterator> FdbTxnKv::full_range_get(std::string
begin, std::string end,
+
FullRangeGetIteratorOptions opts) {
+ return std::make_unique<fdb::FullRangeGetIterator>(std::move(begin),
std::move(end),
+ std::move(opts));
+}
+
} // namespace doris::cloud
namespace doris::cloud::fdb {
@@ -586,4 +592,129 @@ TxnErrorCode
Transaction::batch_get(std::vector<std::optional<std::string>>* res
return TxnErrorCode::TXN_OK;
}
+FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
+ FullRangeGetIteratorOptions opts)
+ : opts_(std::move(opts)), begin_(std::move(begin)),
end_(std::move(end)) {
+ DCHECK(dynamic_cast<FdbTxnKv*>(opts_.txn_kv.get()));
+ DCHECK(!opts_.txn || dynamic_cast<fdb::Transaction*>(opts_.txn)) <<
opts_.txn;
+}
+
+FullRangeGetIterator::~FullRangeGetIterator() {
+ if (fut_) {
+ static_cast<void>(fdb::await_future(fut_));
+ fdb_future_destroy(fut_);
+ }
+}
+
+bool FullRangeGetIterator::has_next() {
+ if (!is_valid_) {
+ return false;
+ }
+
+ if (!inner_iter_) {
+ // The first call
+ init();
+ if (!is_valid_) {
+ return false;
+ }
+
+ return inner_iter_->has_next();
+ }
+
+ if (inner_iter_->has_next()) {
+ if (prefetch()) {
+ TEST_SYNC_POINT("fdb.FullRangeGetIterator.has_next_prefetch");
+ async_inner_get(inner_iter_->next_begin_key());
+ }
+ return true;
+ }
+
+ if (!inner_iter_->more()) {
+ return false;
+ }
+
+ if (!fut_) {
+ async_inner_get(inner_iter_->next_begin_key());
+ if (!is_valid_) {
+ return false;
+ }
+ }
+
+ await_future();
+ return is_valid_ ? inner_iter_->has_next() : false;
+}
+
+std::optional<std::pair<std::string_view, std::string_view>>
FullRangeGetIterator::next() {
+ if (!has_next()) {
+ return std::nullopt;
+ }
+
+ return inner_iter_->next();
+}
+
+void FullRangeGetIterator::await_future() {
+ auto ret = fdb::await_future(fut_);
+ if (ret != TxnErrorCode::TXN_OK) {
+ is_valid_ = false;
+ return;
+ }
+
+ auto err = fdb_future_get_error(fut_);
+ if (err) {
+ is_valid_ = false;
+ LOG(WARNING) << fdb_get_error(err);
+ return;
+ }
+
+ if (opts_.obj_pool && inner_iter_) {
+ opts_.obj_pool->push_back(std::move(inner_iter_));
+ }
+ inner_iter_ = std::make_unique<RangeGetIterator>(fut_);
+ fut_ = nullptr;
+
+ if (inner_iter_->init() != TxnErrorCode::TXN_OK) {
+ is_valid_ = false;
+ }
+}
+
+void FullRangeGetIterator::init() {
+ async_inner_get(begin_);
+ if (!is_valid_) {
+ return;
+ }
+
+ await_future();
+}
+
+bool FullRangeGetIterator::prefetch() {
+ return opts_.prefetch && is_valid_ && !fut_ && inner_iter_->more();
+}
+
+void FullRangeGetIterator::async_inner_get(std::string_view begin) {
+ DCHECK(!fut_);
+
+ auto* txn = static_cast<Transaction*>(opts_.txn);
+ if (!txn) {
+ // Create a new txn for each inner range get
+ std::unique_ptr<cloud::Transaction> txn1;
+ // TODO(plat1ko): Async create txn
+ TxnErrorCode err = opts_.txn_kv->create_txn(&txn1);
+ if (err != TxnErrorCode::TXN_OK) {
+ is_valid_ = false;
+ return;
+ }
+
+ txn_.reset(static_cast<Transaction*>(txn1.release()));
+ txn = txn_.get();
+ }
+
+ // TODO(plat1ko): Support `Transaction::async_get` api
+ fut_ = fdb_transaction_get_range(
+ txn->txn_,
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)begin.data(), begin.size()),
+ FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end_.data(),
end_.size()), opts_.limit,
+ 0 /*target_bytes, unlimited*/,
FDBStreamingMode::FDB_STREAMING_MODE_WANT_ALL,
+ // FDBStreamingMode::FDB_STREAMING_MODE_ITERATOR,
+ 0 /*iteration*/, opts_.snapshot, false /*reverse*/);
+}
+
} // namespace doris::cloud::fdb
diff --git a/cloud/src/meta-service/txn_kv.h b/cloud/src/meta-service/txn_kv.h
index bb12e78404e..99b187e0f63 100644
--- a/cloud/src/meta-service/txn_kv.h
+++ b/cloud/src/meta-service/txn_kv.h
@@ -38,6 +38,49 @@ namespace doris::cloud {
class Transaction;
class RangeGetIterator;
+class TxnKv;
+
+/**
+ * Unlike `RangeGetIterator`, which can only iterate within a page of range,
this iterator is
+ * capable of iterating over the entire specified range.
+ *
+ * Usage:
+ * for (auto kvp = it.next(); kvp.has_value(); kvp = it.next()) {
+ * auto [k, v] = *kvp;
+ * }
+ * if (!it.is_valid()) {
+ * return err;
+ * }
+ */
+struct FullRangeGetIteratorOptions {
+ std::shared_ptr<TxnKv> txn_kv;
+ // Trigger prefetch getting next batch kvs before access them
+ bool prefetch = false;
+ bool snapshot = false;
+ // If non-zero, indicates the maximum number of key-value pairs to return
(not effective in memkv)
+ int limit = 0;
+ // Reference. If not null, each inner range get is performed through this
transaction; otherwise
+ // perform each inner range get through a new transaction.
+ Transaction* txn = nullptr;
+ // If users want to extend the lifespan of the kv pair returned by
`next()`, they can pass an
+ // object pool to collect the inner iterators that have completed iterated.
+ std::vector<std::unique_ptr<RangeGetIterator>>* obj_pool = nullptr;
+
+ FullRangeGetIteratorOptions(std::shared_ptr<TxnKv> _txn_kv) :
txn_kv(std::move(_txn_kv)) {}
+};
+
+class FullRangeGetIterator {
+public:
+ FullRangeGetIterator() = default;
+
+ virtual ~FullRangeGetIterator() = default;
+
+ virtual bool is_valid() = 0;
+
+ virtual bool has_next() = 0;
+
+ virtual std::optional<std::pair<std::string_view, std::string_view>>
next() = 0;
+};
class TxnKv {
public:
@@ -54,6 +97,9 @@ public:
virtual TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) = 0;
virtual int init() = 0;
+
+ virtual std::unique_ptr<FullRangeGetIterator> full_range_get(
+ std::string begin, std::string end, FullRangeGetIteratorOptions
opts) = 0;
};
class Transaction {
@@ -280,6 +326,9 @@ public:
int init() override;
+ std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin,
std::string end,
+
FullRangeGetIteratorOptions opts) override;
+
private:
std::shared_ptr<fdb::Network> network_;
std::shared_ptr<fdb::Database> database_;
@@ -416,6 +465,8 @@ private:
class Transaction : public cloud::Transaction {
public:
friend class Database;
+ friend class FullRangeGetIterator;
+
Transaction(std::shared_ptr<Database> db) : db_(std::move(db)) {}
~Transaction() override {
@@ -520,5 +571,41 @@ private:
size_t approximate_bytes_ {0};
};
+class FullRangeGetIterator final : public cloud::FullRangeGetIterator {
+public:
+ FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetIteratorOptions opts);
+
+ ~FullRangeGetIterator() override;
+
+ bool is_valid() override { return is_valid_; }
+
+ bool has_next() override;
+
+ std::optional<std::pair<std::string_view, std::string_view>> next()
override;
+
+private:
+ // Set `is_valid_` to false if meet any error
+ void init();
+
+ // Await `fut_` and create new inner iter.
+ // Set `is_valid_` to false if meet any error
+ void await_future();
+
+ // Perform a paginate range get asynchronously and set `fut_`.
+ // Set `is_valid_` to false if meet any error
+ void async_inner_get(std::string_view begin);
+
+ bool prefetch();
+
+ FullRangeGetIteratorOptions opts_;
+
+ bool is_valid_ = true;
+ std::string begin_;
+ std::string end_;
+ std::unique_ptr<Transaction> txn_;
+ std::unique_ptr<RangeGetIterator> inner_iter_;
+ FDBFuture* fut_ = nullptr;
+};
+
} // namespace fdb
} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 94b84aa4ba0..ab8101ee2c9 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -15,6 +15,7 @@ add_executable(keys_test keys_test.cpp)
add_executable(doris_txn_test doris_txn_test.cpp)
add_executable(txn_kv_test txn_kv_test.cpp)
+set_target_properties(txn_kv_test PROPERTIES COMPILE_FLAGS
"-fno-access-control")
add_executable(recycler_test recycler_test.cpp)
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index 63a89a07434..4f8eed1090d 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -24,12 +24,16 @@
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
+#include <chrono>
#include <cstddef>
+#include <string>
+#include <thread>
#include "common/config.h"
#include "common/stopwatch.h"
#include "common/sync_point.h"
#include "common/util.h"
+#include "meta-service/codec.h"
#include "meta-service/doris_txn.h"
#include "meta-service/keys.h"
#include "meta-service/mem_txn_kv.h"
@@ -545,3 +549,262 @@ TEST(TxnKvTest, BatchGet) {
}
}
}
+
+TEST(TxnKvTest, FullRangeGetIterator) {
+ using namespace std::chrono_literals;
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ constexpr std::string_view prefix = "FullRangeGetIterator";
+ for (int i = 0; i < 100; ++i) {
+ std::string key {prefix};
+ encode_int64(i, &key);
+ txn->put(key, std::to_string(i));
+ }
+ err = txn->commit();
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ std::string begin {prefix};
+ std::string end {prefix};
+ encode_int64(INT64_MAX, &end);
+
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer(
+ (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ sp->enable_processing();
+
+ {
+ // Without txn
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ ASSERT_TRUE(it->is_valid());
+
+ int cnt = 0;
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto [k, v] = *kvp;
+ EXPECT_EQ(v, std::to_string(cnt));
+ ++cnt;
+ // Total cost: 60ms * 100 = 6s > fdb txn timeout 5s, however we
create a new transaction
+ // in each inner range get
+ std::this_thread::sleep_for(60ms);
+ }
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+ }
+
+ {
+ // With txn
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ opts.txn = txn.get();
+
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ ASSERT_TRUE(it->is_valid());
+
+ int cnt = 0;
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto [k, v] = *kvp;
+ EXPECT_EQ(v, std::to_string(cnt));
+ ++cnt;
+ }
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+ }
+
+ {
+ // With prefetch
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ opts.txn = txn.get();
+ opts.prefetch = true;
+
+ int prefetch_cnt = 0;
+ sp->set_call_back("fdb.FullRangeGetIterator.has_next_prefetch",
[&](void* p) {
+ ++prefetch_cnt;
+ std::cout << "With prefetch prefetch_cnt=" << prefetch_cnt <<
std::endl;
+ });
+
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ ASSERT_TRUE(it->is_valid());
+
+ int cnt = 0;
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto [k, v] = *kvp;
+ EXPECT_EQ(v, std::to_string(cnt));
+ ++cnt;
+ // Sleep to wait for prefetch to be ready
+ std::this_thread::sleep_for(1ms);
+ }
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+
+ sp->clear_call_back("fdb.FullRangeGetIterator.has_next_prefetch");
+ }
+
+ {
+ // With object pool
+ std::vector<std::unique_ptr<RangeGetIterator>> obj_pool;
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ opts.obj_pool = &obj_pool;
+
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ ASSERT_TRUE(it->is_valid());
+
+ int cnt = 0;
+ std::vector<std::string_view> values;
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto [k, v] = *kvp;
+ EXPECT_EQ(v, std::to_string(cnt));
+ values.push_back(v);
+ if (cnt % 25 == 24) {
+ // values should be alive
+ int base = cnt / 25 * 25;
+ for (int i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], std::to_string(base + i));
+ }
+ values.clear();
+ obj_pool.clear();
+ }
+ ++cnt;
+ }
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+ }
+
+ {
+ // Abnormal
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ opts.txn = txn.get();
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ auto* fdb_it = static_cast<fdb::FullRangeGetIterator*>(it.get());
+ fdb_it->is_valid_ = false;
+ ASSERT_FALSE(it->is_valid());
+ ASSERT_FALSE(it->has_next());
+ ASSERT_FALSE(it->next().has_value());
+
+ fdb_it->is_valid_ = true;
+ ASSERT_TRUE(it->is_valid());
+ int cnt = 0;
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ auto [k, v] = *kvp;
+ EXPECT_EQ(v, std::to_string(cnt));
+ ++cnt;
+ // Total cost: 60ms * 100 = 6s > fdb txn timeout 5s
+ std::this_thread::sleep_for(60ms);
+ }
+ // Txn timeout
+ ASSERT_FALSE(it->is_valid());
+ ASSERT_FALSE(it->has_next());
+ ASSERT_FALSE(it->next().has_value());
+ }
+
+ {
+ // Abnormal dtor
+ int prefetch_cnt = 0;
+ sp->set_call_back("fdb.FullRangeGetIterator.has_next_prefetch",
[&](void* p) {
+ ++prefetch_cnt;
+ std::cout << "Abnormal dtor prefetch_cnt=" << prefetch_cnt <<
std::endl;
+ });
+
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ opts.prefetch = true;
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ auto kvp = it->next();
+ ASSERT_TRUE(kvp.has_value());
+ kvp = it->next(); // Trigger prefetch
+ ASSERT_TRUE(kvp.has_value());
+ auto* fdb_it = static_cast<fdb::FullRangeGetIterator*>(it.get());
+ ASSERT_TRUE(fdb_it->fut_ != nullptr); // There is an inflight range get
+ // Since there is an inflight range get, should not trigger another
prefetch
+ ASSERT_FALSE(fdb_it->prefetch());
+
+ sp->clear_call_back("fdb.FullRangeGetIterator.has_next_prefetch");
+
+ // `~FullRangeGetIterator` without consuming inflight range get result
+ }
+
+ {
+ // Benchmark prefetch
+ // No prefetch
+ FullRangeGetIteratorOptions opts(txn_kv);
+ opts.limit = 11;
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ opts.txn = txn.get();
+
+ auto it = txn_kv->full_range_get(begin, end, opts);
+ int cnt = 0;
+ auto start = std::chrono::steady_clock::now();
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ ++cnt;
+ std::this_thread::sleep_for(1ms);
+ }
+ auto finish = std::chrono::steady_clock::now();
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+ std::cout << "no prefetch cost="
+ <<
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+ << "ms" << std::endl;
+
+ // Prefetch
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ opts.txn = txn.get();
+ opts.prefetch = true;
+ it = txn_kv->full_range_get(begin, end, opts);
+ cnt = 0;
+ start = std::chrono::steady_clock::now();
+ for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+ ++cnt;
+ std::this_thread::sleep_for(1ms);
+ }
+ finish = std::chrono::steady_clock::now();
+ ASSERT_TRUE(it->is_valid());
+ EXPECT_EQ(cnt, 100);
+ std::cout << "prefetch cost="
+ <<
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+ << "ms" << std::endl;
+
+ // Use RangeGetIterator
+ err = txn_kv->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ std::unique_ptr<RangeGetIterator> inner_it;
+ auto inner_begin = begin;
+ cnt = 0;
+ start = std::chrono::steady_clock::now();
+ do {
+ err = txn->get(inner_begin, end, &inner_it, false, 11);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ if (!inner_it->has_next()) {
+ break;
+ }
+ while (inner_it->has_next()) {
+ // recycle corresponding resources
+ auto [k, v] = inner_it->next();
+ std::this_thread::sleep_for(1ms);
+ ++cnt;
+ if (!inner_it->has_next()) {
+ inner_begin = k;
+ }
+ }
+ inner_begin.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (inner_it->more());
+ finish = std::chrono::steady_clock::now();
+ EXPECT_EQ(cnt, 100);
+ std::cout << "RangeGetIterator cost="
+ <<
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+ << "ms" << std::endl;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]