This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b194a7cf83 [improvement](memory) Support GC segment cache, when memory
insufficient (#16987)
b194a7cf83 is described below
commit b194a7cf8324fdb551d51fc0519c56df5888c17e
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Feb 22 18:31:20 2023 +0800
[improvement](memory) Support GC segment cache, when memory insufficient
(#16987)
fix segment cache memory tracker statistics
support GC
---
be/src/olap/lru_cache.cpp | 11 +++++++----
be/src/olap/lru_cache.h | 11 +++++++----
be/src/olap/rowset/segment_v2/segment.cpp | 2 ++
be/src/olap/rowset/segment_v2/segment.h | 2 ++
be/src/olap/segment_loader.cpp | 10 ++++++++--
be/src/olap/segment_loader.h | 2 ++
be/src/util/mem_info.cpp | 8 ++++++++
be/test/olap/lru_cache_test.cpp | 18 +++++++++---------
8 files changed, 45 insertions(+), 19 deletions(-)
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 11f504f241..00dff82d1f 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -351,7 +351,7 @@ bool LRUCache::_check_element_count_limit() {
Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void*
value, size_t charge,
void (*deleter)(const CacheKey& key, void*
value),
- MemTrackerLimiter* tracker, CachePriority
priority) {
+ MemTrackerLimiter* tracker, CachePriority
priority, size_t bytes) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
@@ -359,16 +359,19 @@ Cache::Handle* LRUCache::insert(const CacheKey& key,
uint32_t hash, void* value,
e->charge = charge;
e->key_length = key.size();
e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : 1);
+ DCHECK(_type == LRUCacheType::SIZE || bytes != -1) << " _type " << _type;
+ e->bytes = (_type == LRUCacheType::SIZE ? handle_size + charge :
handle_size + bytes);
e->hash = hash;
e->refs = 2; // one for the returned handle, one for LRUCache.
e->next = e->prev = nullptr;
e->in_cache = true;
e->priority = priority;
e->mem_tracker = tracker;
+ e->type = _type;
memcpy(e->key_data, key.data(), key.size());
// The memory of the parameter value should be recorded in the tls mem
tracker,
// transfer the memory ownership of the value to
ShardedLRUCache::_mem_tracker.
- THREAD_MEM_TRACKER_TRANSFER_TO(e->total_size, tracker);
+ THREAD_MEM_TRACKER_TRANSFER_TO(e->bytes, tracker);
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard l(_mutex);
@@ -568,10 +571,10 @@ ShardedLRUCache::~ShardedLRUCache() {
Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value,
size_t charge,
void (*deleter)(const CacheKey& key,
void* value),
- CachePriority priority) {
+ CachePriority priority, size_t bytes) {
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter,
_mem_tracker.get(),
- priority);
+ priority, bytes);
}
Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index e988342cab..50749db662 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -175,7 +175,7 @@ public:
// value will be passed to "deleter".
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
- CachePriority priority = CachePriority::NORMAL) = 0;
+ CachePriority priority = CachePriority::NORMAL,
size_t bytes = -1) = 0;
// If the cache has no mapping for "key", returns nullptr.
//
@@ -240,12 +240,14 @@ typedef struct LRUHandle {
size_t charge;
size_t key_length;
size_t total_size; // including key length
+ size_t bytes; // Used by LRUCacheType::NUMBER, LRUCacheType::SIZE
equal to total_size.
bool in_cache; // Whether entry is in the cache.
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
CachePriority priority = CachePriority::NORMAL;
MemTrackerLimiter* mem_tracker;
char key_data[1]; // Beginning of key
+ LRUCacheType type;
CacheKey key() const {
// For cheaper lookups, we allow a temporary Handle object
@@ -259,7 +261,7 @@ typedef struct LRUHandle {
void free() {
(*deleter)(key(), value);
- THREAD_MEM_TRACKER_TRANSFER_FROM(total_size, mem_tracker);
+ THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker);
::free(this);
}
@@ -332,7 +334,7 @@ public:
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const CacheKey& key, void* value),
MemTrackerLimiter* tracker,
- CachePriority priority = CachePriority::NORMAL);
+ CachePriority priority = CachePriority::NORMAL,
size_t bytes = -1);
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
@@ -398,7 +400,8 @@ public:
virtual ~ShardedLRUCache();
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
- CachePriority priority = CachePriority::NORMAL)
override;
+ CachePriority priority = CachePriority::NORMAL,
+ size_t bytes = -1) override;
virtual Handle* lookup(const CacheKey& key) override;
virtual void release(Handle* handle) override;
virtual void erase(const CacheKey& key) override;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 09e294496c..c0f4d7fa89 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -198,6 +198,7 @@ Status Segment::_load_pk_bloom_filter() {
return _load_pk_bf_once.call([this] {
RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader,
_footer.primary_key_index_meta()));
_meta_mem_usage += _pk_index_reader->get_bf_memory_size();
+
_segment_meta_mem_tracker->consume(_pk_index_reader->get_bf_memory_size());
return Status::OK();
});
}
@@ -214,6 +215,7 @@ Status Segment::load_index() {
RETURN_IF_ERROR(
_pk_index_reader->parse_index(_file_reader,
_footer.primary_key_index_meta()));
_meta_mem_usage += _pk_index_reader->get_memory_size();
+
_segment_meta_mem_tracker->consume(_pk_index_reader->get_memory_size());
return Status::OK();
} else {
// read and parse short key index page
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 47e5042f7b..c8414566fc 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -116,6 +116,8 @@ public:
io::FileReaderSPtr file_reader() { return _file_reader; }
+ int64_t meta_mem_usage() const { return _meta_mem_usage; }
+
private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema);
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index a83143e6f9..3e604adf17 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -32,7 +32,8 @@ void SegmentLoader::create_global_instance(size_t capacity) {
}
SegmentLoader::SegmentLoader(size_t capacity) {
- _cache = std::unique_ptr<Cache>(new_lru_cache("SegmentCache", capacity,
LRUCacheType::NUMBER));
+ _cache = std::unique_ptr<Cache>(
+ new_lru_cache("SegmentMetaCache", capacity, LRUCacheType::NUMBER));
}
bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key,
SegmentCacheHandle* handle) {
@@ -52,8 +53,13 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey&
key, SegmentLoader::C
delete cache_value;
};
+ int64_t meta_mem_usage = 0;
+ for (auto segment : value.segments) {
+ meta_mem_usage += segment->meta_mem_usage();
+ }
+
auto lru_handle = _cache->insert(key.encode(), &value,
sizeof(SegmentLoader::CacheValue),
- deleter, CachePriority::NORMAL);
+ deleter, CachePriority::NORMAL,
meta_mem_usage);
*handle = SegmentCacheHandle(_cache.get(), lru_handle);
}
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index d2da517d4c..8535e69281 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -87,6 +87,8 @@ public:
// Try to prune the segment cache if expired.
Status prune();
+ int64_t prune_all() { return _cache->prune(); };
+ int64_t segment_cache_mem_consumption() { return
_cache->mem_consumption(); }
private:
SegmentLoader();
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 4ba9883eaa..8c995b3b31 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -35,6 +35,7 @@
#include "common/config.h"
#include "gutil/strings/split.h"
#include "olap/page_cache.h"
+#include "olap/segment_loader.h"
#include "util/cgroup_util.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
@@ -97,6 +98,8 @@ void MemInfo::process_cache_gc(int64_t& freed_mem) {
freed_mem +=
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+ // TODO add freed_mem
+ SegmentLoader::instance()->prune();
}
// step1: free all cache
@@ -134,6 +137,11 @@ bool MemInfo::process_full_gc() {
if (freed_mem > _s_process_full_gc_size) {
return true;
}
+ freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
+ SegmentLoader::instance()->prune_all();
+ if (freed_mem > _s_process_full_gc_size) {
+ return true;
+ }
freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
if (freed_mem > _s_process_full_gc_size) {
return true;
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index bafe4e7288..c779996c54 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -224,7 +224,7 @@ static void insert_LRUCache(LRUCache& cache, const
CacheKey& key, int value,
static std::unique_ptr<MemTrackerLimiter> lru_cache_tracker =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL,
"TestLruCache");
cache.release(cache.insert(key, hash, EncodeValue(value), value, &deleter,
- lru_cache_tracker.get(), priority));
+ lru_cache_tracker.get(), priority, value));
}
TEST_F(CacheTest, Usage) {
@@ -232,34 +232,34 @@ TEST_F(CacheTest, Usage) {
cache.set_capacity(1040);
// The lru usage is handle_size + charge.
- // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98
+ // handle_size = sizeof(handle) - 1 + key size = 104 - 1 + 3 = 106
CacheKey key1("100");
insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
- ASSERT_EQ(198, cache.get_usage()); // 100 + 98
+ ASSERT_EQ(206, cache.get_usage()); // 100 + 106
CacheKey key2("200");
insert_LRUCache(cache, key2, 200, CachePriority::DURABLE);
- ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE
+ ASSERT_EQ(512, cache.get_usage()); // 206 + 306(d), d = DURABLE
CacheKey key3("300");
insert_LRUCache(cache, key3, 300, CachePriority::NORMAL);
- ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398
+ ASSERT_EQ(918, cache.get_usage()); // 206 + 306(d) + 406
CacheKey key4("400");
insert_LRUCache(cache, key4, 400, CachePriority::NORMAL);
- ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398
+ ASSERT_EQ(812, cache.get_usage()); // 306(d) + 506, evict 206 406
CacheKey key5("500");
insert_LRUCache(cache, key5, 500, CachePriority::NORMAL);
- ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498
+ ASSERT_EQ(912, cache.get_usage()); // 306(d) + 606, evict 506
CacheKey key6("600");
insert_LRUCache(cache, key6, 600, CachePriority::NORMAL);
- ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 498
+ ASSERT_EQ(1012, cache.get_usage()); // 306(d) + 706, evict 506
CacheKey key7("950");
insert_LRUCache(cache, key7, 950, CachePriority::DURABLE);
- ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 >
1040, so insert failed
+ ASSERT_EQ(0, cache.get_usage()); // evict 306 706, because 950 + 106 >
1040, so insert failed
}
TEST_F(CacheTest, Prune) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]