This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f43fb3  [Cache][BE] LRU cache for sql/partition cache #2581 (#4005)
5f43fb3 is described below

commit 5f43fb3bde67629aa0ba7446cb1facf25aee438f
Author: HaiBo Li <liha...@vip.126.com>
AuthorDate: Sun Sep 20 20:50:51 2020 +0800

    [Cache][BE] LRU cache for sql/partition cache #2581 (#4005)
    
    1. Find the cache node by SQL Key, then find the corresponding partition 
data by Partition Key, and then decide whether to hit Cache by LastVersion and 
LastVersionTime
    2. Refers to the classic cache algorithm LRU, which is the least recently 
used algorithm, using a three-layer data structure to achieve
    3. The Cache elimination algorithm is implemented by ensuring the range of 
the partition as much as possible, to avoid the situation of partition 
discontinuity, which will reduce the hit rate of the Cache partition,
    4. Use the two thresholds of maximum memory and elastic memory to control 
to avoid frequent elimination of data
---
 be/src/common/config.h                            |  10 +
 be/src/runtime/CMakeLists.txt                     |   2 +
 be/src/runtime/cache/cache_utils.h                |  87 +++++++
 be/src/runtime/cache/result_cache.cpp             | 270 +++++++++++++++++++++
 be/src/runtime/cache/result_cache.h               | 122 ++++++++++
 be/src/runtime/cache/result_node.cpp              | 272 ++++++++++++++++++++++
 be/src/runtime/cache/result_node.h                | 190 +++++++++++++++
 be/src/runtime/exec_env.cpp                       |   3 +-
 be/src/runtime/exec_env.h                         |   5 +
 be/src/runtime/exec_env_init.cpp                  |  14 +-
 be/src/service/internal_service.cpp               |  27 +++
 be/src/service/internal_service.h                 |  15 ++
 be/src/util/doris_metrics.cpp                     |   9 +-
 be/src/util/doris_metrics.h                       |   5 +
 be/src/util/uid_util.h                            |  27 +++
 be/test/runtime/CMakeLists.txt                    |   1 +
 be/test/runtime/cache/partition_cache_test.cpp    | 260 +++++++++++++++++++++
 docs/zh-CN/administrator-guide/partition_cache.md | 197 ++++++++++++++++
 18 files changed, 1513 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index f868d78..6d245d9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -576,6 +576,16 @@ namespace config {
 
     // Soft memory limit as a fraction of hard memory limit.
     CONF_Double(soft_mem_limit_frac, "0.9");
+    
+    // Set max cache's size of query results, the unit is M byte
+    CONF_Int32(query_cache_max_size_mb, "256"); 
+
+    // Cache memory is pruned when reach query_cache_max_size_mb + 
query_cache_elasticity_size_mb
+    CONF_Int32(query_cache_elasticity_size_mb, "128");
+
+    // Maximum number of cache partitions corresponding to a SQL
+    CONF_Int32(query_cache_max_partition_count, "1024");
+    
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index e2ea260..0176d74 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -106,6 +106,8 @@ set(RUNTIME_FILES
     mysql_result_writer.cpp
     memory/system_allocator.cpp
     memory/chunk_allocator.cpp
+    cache/result_node.cpp
+    cache/result_cache.cpp
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/runtime/cache/cache_utils.h 
b/be/src/runtime/cache/cache_utils.h
new file mode 100644
index 0000000..192289f
--- /dev/null
+++ b/be/src/runtime/cache/cache_utils.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H
+#define DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H
+
+#include <gutil/integral_types.h>
+#include <sys/time.h>
+
+#include <algorithm>
+#include <boost/thread.hpp>
+#include <cassert>
+#include <cstdio>
+#include <cstdlib>
+#include <exception>
+#include <iostream>
+#include <list>
+#include <map>
+#include <shared_mutex>
+
+namespace doris {
+
+typedef boost::shared_lock<boost::shared_mutex> CacheReadLock;
+typedef boost::unique_lock<boost::shared_mutex> CacheWriteLock;
+
+//#ifndef PARTITION_CACHE_DEV
+//#define PARTITION_CACHE_DEV
+//#endif
+
+struct CacheStat {
+    static const uint32 DAY_SECONDS = 86400;
+    long cache_time;
+    long last_update_time;
+    long last_read_time;
+    uint32 read_count;
+    CacheStat() { init(); }
+
+    inline long cache_time_second() {
+        struct timeval tv;
+        gettimeofday(&tv, NULL);
+        return tv.tv_sec;
+    }
+
+    void init() {
+        cache_time = 0;
+        last_update_time = 0;
+        last_read_time = 0;
+        read_count = 0;
+    }
+
+    void update() {
+        last_update_time = cache_time_second();
+        if (cache_time == 0) {
+            cache_time = last_update_time;
+        }
+        last_read_time = last_update_time;
+        read_count++;
+    }
+
+    void query() {
+        last_read_time = cache_time_second();
+        read_count++;
+    }
+
+    double last_query_day() { return (cache_time_second() - last_read_time) * 
1.0 / DAY_SECONDS; }
+
+    double avg_query_count() {
+        return read_count * DAY_SECONDS * 1.0 / (cache_time_second() - 
last_read_time + 1);
+    }
+};
+
+} // namespace doris
+#endif 
diff --git a/be/src/runtime/cache/result_cache.cpp 
b/be/src/runtime/cache/result_cache.cpp
new file mode 100644
index 0000000..9632664
--- /dev/null
+++ b/be/src/runtime/cache/result_cache.cpp
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/cache/result_cache.h"
+#include "util/doris_metrics.h"
+
+namespace doris {
+
+/**
+* Remove the tail node of link
+*/
+ResultNode* ResultNodeList::pop() {
+    remove(_head);
+    return _head;
+}
+
+void ResultNodeList::remove(ResultNode* node) {
+    if (!node) return;
+    if (node == _head) _head = node->get_next();
+    if (node == _tail) _tail = node->get_prev();
+    node->unlink();
+    _node_count--;
+}
+
+void ResultNodeList::push_back(ResultNode* node) {
+    if (!node) return;
+    if (!_head) _head = node;
+    node->append(_tail);
+    _tail = node;
+    _node_count++;
+}
+
+void ResultNodeList::move_tail(ResultNode* node) {
+    if (!node || node == _tail) return;
+    if (!_head)
+        _head = node;
+    else if (node == _head)
+        _head = node->get_next();
+    node->unlink();
+    node->append(_tail);
+    _tail = node;
+}
+
+void ResultNodeList::delete_node(ResultNode** node) { 
+    (*node)->clear();
+    SAFE_DELETE(*node); 
+}
+
+void ResultNodeList::clear() {
+    LOG(INFO) << "clear result node list.";
+    while (_head) {
+        ResultNode* tmp_node = _head->get_next();
+        _head->clear();
+        SAFE_DELETE(_head);
+        _head = tmp_node;
+    }
+    _node_count = 0;
+}
+/**
+ * Find the node and update partition data
+ * New node, the node updated in the first partition will move to the tail of 
the list
+ */
+void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* 
response) {
+    ResultNode* node;
+    PCacheStatus status;
+    bool update_first = false;
+    UniqueId sql_key = request->sql_key();
+    LOG(INFO) << "update cache, sql key:" << sql_key;
+    
+    CacheWriteLock write_lock(_cache_mtx);
+    auto it = _node_map.find(sql_key);
+    if (it != _node_map.end()) {
+        node = it->second;
+        _cache_size -= node->get_data_size();
+        _partition_count -= node->get_partition_count();
+        status = node->update_partition(request, update_first);
+    } else {
+        node = _node_list.new_node(sql_key);
+        status = node->update_partition(request, update_first);
+        _node_list.push_back(node);
+        _node_map[sql_key] = node;
+        _node_count += 1;
+    }
+    if (update_first) {
+        _node_list.move_tail(node);
+    }
+    _cache_size += node->get_data_size();
+    _partition_count += node->get_partition_count();
+    response->set_status(status);
+
+    prune();
+    update_monitor();
+}
+
+/**
+ * Fetch cache through sql key, partition key, version and time
+ */
+void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* 
result) {
+    bool hit_first = false;
+    ResultNodeMap::iterator node_it;
+    const UniqueId sql_key = request->sql_key();
+    LOG(INFO) << "fetch cache, sql key:" << sql_key;
+    {
+        CacheReadLock read_lock(_cache_mtx);    
+        node_it = _node_map.find(sql_key);
+        if (node_it == _node_map.end()) {
+            result->set_status(PCacheStatus::NO_SQL_KEY);
+            LOG(INFO) << "no such sql key:" << sql_key;
+            return;
+        }
+        ResultNode* node = node_it->second;
+        PartitionRowBatchList part_rowbatch_list;
+        PCacheStatus status = node->fetch_partition(request, 
part_rowbatch_list, hit_first);
+        
+        for (auto part_it = part_rowbatch_list.begin(); part_it != 
part_rowbatch_list.end(); part_it++) {
+            PCacheValue* srcValue = (*part_it)->get_value();
+            if (srcValue != NULL) {
+                PCacheValue* value = result->add_values();
+                value->CopyFrom(*srcValue);
+                LOG(INFO) << "fetch cache partition key:" << 
srcValue->param().partition_key();
+            } else {
+                LOG(WARNING) << "prowbatch of cache is null";
+                status = PCacheStatus::EMPTY_DATA;
+                break;
+            }
+        }
+        result->set_status(status);
+    }
+
+    if (hit_first) {
+        CacheWriteLock write_lock(_cache_mtx);
+        _node_list.move_tail(node_it->second);
+    }
+}
+
+bool ResultCache::contains(const UniqueId& sql_key) {
+    CacheReadLock read_lock(_cache_mtx);
+    return _node_map.find(sql_key) != _node_map.end();
+}
+
+/**
+ * enum PClearType {
+ *   CLEAR_ALL = 0,
+ *   PRUNE_CACHE = 1,
+ *   CLEAR_BEFORE_TIME = 2,
+ *   CLEAR_SQL_KEY = 3
+ * };
+ */
+void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* 
response) {
+    LOG(INFO) << "clear cache type" << request->clear_type()
+              << ", node size:" << _node_list.get_node_count() << ", map 
size:" << _node_map.size();
+    CacheWriteLock write_lock(_cache_mtx);
+    //0 clear, 1 prune, 2 before_time,3 sql_key
+    switch (request->clear_type()) {
+    case PClearType::CLEAR_ALL:
+        _node_list.clear();
+        _node_map.clear();
+        _cache_size = 0;
+        _node_count = 0;
+        _partition_count = 0;
+        break;
+    case PClearType::PRUNE_CACHE:
+        prune();
+        break;
+    default:
+        break;
+    }
+    update_monitor();
+    response->set_status(PCacheStatus::CACHE_OK);
+}
+
+//private method
+ResultNode* find_min_time_node(ResultNode* result_node) {
+    if (result_node->get_prev()) {
+        if (result_node->get_prev()->first_partition_last_time() <=
+            result_node->first_partition_last_time()) {
+            return result_node->get_prev();
+        }
+    }
+
+    if (result_node->get_next()) {
+        if (result_node->get_next()->first_partition_last_time() <
+            result_node->first_partition_last_time()) {
+            return result_node->get_next();
+        }
+    }
+    return result_node;
+}
+
+/*
+* Two-dimensional array, prune the min last_read_time PartitionRowBatch.
+* The following example is the last read time array.
+* 1 and 2 is the read time, nodes with pruning read time < 3
+* Before:
+*   1,2         //_head ResultNode*
+*   1,2,3,4,5   
+*   2,4,3,6,8   
+*   5,7,9,11,13 //_tail ResultNode*
+* After:
+*   4,5         //_head
+*   4,3,6,8
+*   5,7,9,11,13 //_tail
+*/
+void ResultCache::prune() {
+    if (_cache_size <= (_max_size + _elasticity_size)) {
+        return;
+    }
+    LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", 
max_size : " << _max_size
+              << ", elasticity_size : " << _elasticity_size;
+    ResultNode* result_node = _node_list.get_head();
+    while (_cache_size > _max_size) {
+        if (result_node == NULL) {
+            break;
+        }
+        result_node = find_min_time_node(result_node);
+        _cache_size -= result_node->prune_first();
+        if (result_node->get_data_size() == 0) {
+            ResultNode* next_node;
+            if (result_node->get_next()) {
+                next_node = result_node->get_next();
+            } else if (result_node->get_prev()) {
+                next_node = result_node->get_prev();
+            } else {
+                next_node = _node_list.get_head();
+            }
+            remove(result_node);
+            result_node = next_node;
+        }
+    }
+    LOG(INFO) << "finish prune, cache_size : " << _cache_size;
+    _node_count = _node_map.size();
+    _cache_size = 0;
+    _partition_count = 0;
+    for (auto node_it = _node_map.begin(); node_it != _node_map.end(); 
node_it++) {
+        _partition_count += node_it->second->get_partition_count();
+        _cache_size += node_it->second->get_data_size();
+    }
+}
+
+void ResultCache::remove(ResultNode* result_node) {
+    auto node_it = _node_map.find(result_node->get_sql_key());
+    if (node_it != _node_map.end()) {
+        _node_map.erase(node_it);
+        _node_list.remove(result_node);
+        _node_list.delete_node(&result_node);
+    }
+}
+
+void ResultCache::update_monitor() {
+    
DorisMetrics::instance()->query_cache_memory_total_byte->set_value(_cache_size);
+    
DorisMetrics::instance()->query_cache_sql_total_count->set_value(_node_count);
+    
DorisMetrics::instance()->query_cache_partition_total_count->set_value(_partition_count);
+}
+
+} // namespace doris
+
diff --git a/be/src/runtime/cache/result_cache.h 
b/be/src/runtime/cache/result_cache.h
new file mode 100644
index 0000000..c05e253
--- /dev/null
+++ b/be/src/runtime/cache/result_cache.h
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H
+#define DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H
+
+#include <boost/thread.hpp>
+#include <cassert>
+#include <cstdio>
+#include <cstdlib>
+#include <exception>
+#include <iostream>
+#include <list>
+#include <map>
+#include <mutex>
+#include <shared_mutex>
+#include <thread>
+
+#include "common/config.h"
+#include "runtime/cache/cache_utils.h"
+#include "runtime/cache/result_node.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple_row.h"
+
+namespace doris {
+
+typedef std::unordered_map<UniqueId, ResultNode*> ResultNodeMap;
+
+// a doubly linked list class, point to result node
+class ResultNodeList {
+public:
+    ResultNodeList() : _head(NULL), _tail(NULL), _node_count(0) {}
+    virtual ~ResultNodeList() {}
+
+    ResultNode* new_node(const UniqueId& sql_key) { return new 
ResultNode(sql_key); }
+
+    void delete_node(ResultNode** node);
+
+    ResultNode* pop();
+    void move_tail(ResultNode* node);
+    //Just remove node from link, do not delete node
+    void remove(ResultNode* node);
+    void push_back(ResultNode* node);
+    void clear();
+
+    ResultNode* get_head() const { return _head; }
+
+    ResultNode* get_tail() const { return _tail; }
+
+    size_t get_node_count() const { return _node_count; }
+
+private:
+    ResultNode* _head;
+    ResultNode* _tail;
+    size_t _node_count;
+};
+
+/**
+ * Cache results of query, including the entire result set or the result set 
of divided partitions.
+ * Two data structures, one is unordered_map and the other is a doubly linked 
list, corresponding to a result node.
+ * If the cache is hit, the node will be moved to the end of the linked list.
+ * If the cache is cleared, nodes that are expired or have not been accessed 
for a long time will be cleared.
+ */
+class ResultCache {
+public:
+    ResultCache(int32 max_size, int32 elasticity_size) {
+        _max_size = max_size * 1024 * 1024;
+        _elasticity_size = elasticity_size * 1024 * 1024;
+        _cache_size = 0;
+        _node_count = 0;
+        _partition_count = 0;
+    }
+
+    virtual ~ResultCache() {}
+    void update(const PUpdateCacheRequest* request, PCacheResponse* response);
+    void fetch(const PFetchCacheRequest* request, PFetchCacheResult* result);
+    bool contains(const UniqueId& sql_key);
+    void clear(const PClearCacheRequest* request, PCacheResponse* response);
+
+    size_t get_cache_size() { return _cache_size; }
+
+private:
+    void prune();
+    void remove(ResultNode* result_node);
+    void update_monitor();
+
+    //At the same time, multithreaded reading
+    //Single thread updating and cleaning(only single be, Fe is not affected)
+    mutable boost::shared_mutex _cache_mtx;
+    ResultNodeMap _node_map;
+    //List of result nodes corresponding to SqlKey,last recently useed at the 
tail
+    ResultNodeList _node_list;
+    size_t _cache_size;
+    size_t _max_size;
+    double _elasticity_size;
+    size_t _node_count;
+    size_t _partition_count;
+
+private:
+    ResultCache();
+    ResultCache(const ResultCache&);
+    const ResultCache& operator=(const ResultCache&);
+};
+
+} // namespace doris
+#endif 
diff --git a/be/src/runtime/cache/result_node.cpp 
b/be/src/runtime/cache/result_node.cpp
new file mode 100644
index 0000000..7813458
--- /dev/null
+++ b/be/src/runtime/cache/result_node.cpp
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/cache/result_node.h"
+#include "runtime/cache/cache_utils.h"
+
+namespace doris {
+
+bool compare_partition(const PartitionRowBatch* left_node, const 
PartitionRowBatch* right_node) {
+    return left_node->get_partition_key() < right_node->get_partition_key();
+}
+
+//return new batch size,only include the size of PRowBatch
+void PartitionRowBatch::set_row_batch(const PCacheValue& value) {
+    if (_cache_value != NULL && !check_newer(value.param())) {
+        LOG(WARNING) << "set old version data, cache ver:" << 
_cache_value->param().last_version()
+                     << ",cache time:" << 
_cache_value->param().last_version_time()
+                     << ", setdata ver:" << value.param().last_version()
+                     << ",setdata time:" << value.param().last_version_time();
+        return;
+    }
+    SAFE_DELETE(_cache_value);
+    _cache_value = new PCacheValue(value);
+    _data_size += _cache_value->data_size();
+    _cache_stat.update();
+    LOG(INFO) << "finish set row batch, row num:" << _cache_value->rows_size()
+              << ", data size:" << _data_size;
+}
+
+bool PartitionRowBatch::is_hit_cache(const PCacheParam& param) {
+    if (!check_match(param)) {
+        return false;
+    }
+    _cache_stat.query();
+    return true;
+}
+
+void PartitionRowBatch::clear() {
+    LOG(INFO) << "clear partition rowbatch.";
+    SAFE_DELETE(_cache_value);
+    _partition_key = 0;
+    _data_size = 0;
+    _cache_stat.init();
+}
+
+/**
+ * Update partition cache data, find RowBatch from partition map by partition 
key,
+ * the partition rowbatch are stored in the order of partition keys
+ */
+PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request, 
bool& is_update_firstkey) {
+    is_update_firstkey = false;
+    if (_sql_key != request->sql_key()) {
+        LOG(INFO) << "no match sql_key " << request->sql_key().hi() << 
request->sql_key().lo();
+        return PCacheStatus::PARAM_ERROR;
+    }
+
+    if (request->values_size() > config::query_cache_max_partition_count) {
+        LOG(WARNING) << "too many partitions size:" << request->values_size();
+        return PCacheStatus::PARAM_ERROR;
+    }
+
+    //Only one thread per SQL key can update the cache
+    CacheWriteLock write_lock(_node_mtx);
+
+    PartitionKey first_key = kint64max;
+    if (_partition_list.size() == 0) {
+        is_update_firstkey = true;
+    } else {
+        first_key = (*(_partition_list.begin()))->get_partition_key();
+    }
+    PartitionRowBatch* partition = NULL;
+    for (int i = 0; i < request->values_size(); i++) {
+        const PCacheValue& value = request->values(i);
+        PartitionKey partition_key = value.param().partition_key();
+        if (!is_update_firstkey && partition_key <= first_key) {
+            is_update_firstkey = true;
+        }
+        auto it = _partition_map.find(partition_key);
+        if (it == _partition_map.end()) {
+            partition = new PartitionRowBatch(partition_key);
+            partition->set_row_batch(value);
+            _partition_map[partition_key] = partition;
+            _partition_list.push_back(partition);
+#ifdef PARTITION_CACHE_DEV
+            LOG(INFO) << "add index:" << i << ", pkey:" << 
partition->get_partition_key()
+                      << ", list size:" << _partition_list.size()
+                      << ", map size:" << _partition_map.size();
+#endif
+        } else {
+            partition = it->second;
+            _data_size -= partition->get_data_size();
+            partition->set_row_batch(value);
+#ifdef PARTITION_CACHE_DEV
+            LOG(INFO) << "update index:" << i << ", pkey:" << 
partition->get_partition_key()
+                      << ", list size:" << _partition_list.size()
+                      << ", map size:" << _partition_map.size();
+#endif
+        }
+        _data_size += partition->get_data_size();
+    }
+    _partition_list.sort(compare_partition);
+    LOG(INFO) << "finish update batches:" << _partition_list.size();
+    while (config::query_cache_max_partition_count > 0 &&
+           _partition_list.size() > config::query_cache_max_partition_count) {
+        if (prune_first() == 0) {
+            break;
+        }
+    }
+    return PCacheStatus::CACHE_OK;
+}
+
+/**
+* Only the range query of the key of the partition is supported, and the 
separated partition key query is not supported.
+* Because a query can only be divided into two parts, part1 get data from 
cache, part2 fetch_data by scan node from BE.
+* Partion cache : 20191211-20191215
+* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], 
[20191212 - 20191216],[20191210 - 20191215]
+* Miss cache parameter: [20191210 - 20191216]
+*/
+PCacheStatus ResultNode::fetch_partition(const PFetchCacheRequest* request,
+                                         PartitionRowBatchList& 
row_batch_list, bool& is_hit_firstkey) {
+    is_hit_firstkey = false;
+    if (request->params_size() == 0) {
+        return PCacheStatus::PARAM_ERROR;
+    }
+
+    CacheReadLock read_lock(_node_mtx);
+
+    if (_partition_list.size() == 0) {
+        return PCacheStatus::NO_PARTITION_KEY;
+    }
+    
+    if (request->params(0).partition_key() > 
(*_partition_list.rbegin())->get_partition_key() ||
+        request->params(request->params_size() - 1).partition_key() <
+                (*_partition_list.begin())->get_partition_key()) {
+        return PCacheStatus::NO_PARTITION_KEY;
+    }
+
+    bool find = false;
+    int begin_idx = -1, end_idx = -1, param_idx = 0;
+    auto begin_it = _partition_list.end();
+    auto end_it = _partition_list.end();
+    auto part_it = _partition_list.begin();
+
+    PCacheStatus status = PCacheStatus::CACHE_OK;
+    while (param_idx < request->params_size() && part_it != 
_partition_list.end()) {
+#ifdef PARTITION_CACHE_DEV
+        LOG(INFO) << "Param index : " << param_idx
+                  << ", param part Key : " << 
request->params(param_idx).partition_key()
+                  << ", batch part key : " << (*part_it)->get_partition_key();
+#endif
+        if (!find) {
+            while (part_it != _partition_list.end() &&
+                   request->params(param_idx).partition_key() > 
(*part_it)->get_partition_key()) {
+                part_it++;
+            }
+            while (param_idx < request->params_size() &&
+                   request->params(param_idx).partition_key() < 
(*part_it)->get_partition_key()) {
+                param_idx++;
+            }
+            if (request->params(param_idx).partition_key() == 
(*part_it)->get_partition_key()) {
+                find = true;
+            }
+        }
+        if (find) {
+#ifdef PARTITION_CACHE_DEV
+            LOG(INFO) << "Find! Param index : " << param_idx
+                      << ", param part Key : " << 
request->params(param_idx).partition_key()
+                      << ", batch part key : " << 
(*part_it)->get_partition_key()
+                      << ", param part version : " << 
request->params(param_idx).last_version()
+                      << ", batch part version : " << 
(*part_it)->get_value()->param().last_version()
+                      << ", param part version time : " << 
request->params(param_idx).last_version_time()
+                      << ", batch part version time : " << 
(*part_it)->get_value()->param().last_version_time();
+#endif
+            if ((*part_it)->is_hit_cache(request->params(param_idx))) {
+                if (begin_idx < 0) {
+                    begin_idx = param_idx;
+                    begin_it = part_it;
+                }
+                end_idx = param_idx;
+                end_it = part_it;
+                param_idx++;
+                part_it++;
+            } else {
+                status = PCacheStatus::DATA_OVERDUE;
+                break;
+            }
+        }
+    }
+
+    if (begin_it == _partition_list.end() && end_it == _partition_list.end()) {
+        return status;
+    }
+
+    //[20191210 - 20191216] hit partition range [20191212-20191214],the sql 
will be splited to 3 part!
+    if (begin_idx != 0 && end_idx != request->params_size() - 1) {
+        return PCacheStatus::INVALID_KEY_RANGE;
+    }
+    if (begin_it == _partition_list.begin()) {
+        is_hit_firstkey = true;
+    }
+    
+    while (true) {
+        row_batch_list.push_back(*begin_it);
+        if (begin_it == end_it) {
+            break;
+        }
+        begin_it++;
+    }
+    return status;
+}
+
+/*
+* prune first partition result
+*/
+size_t ResultNode::prune_first() {
+    if (_partition_list.size() == 0) {
+        return 0;
+    }
+    PartitionRowBatch* part_node = *_partition_list.begin();
+    size_t prune_size = part_node->get_data_size();
+    _partition_list.erase(_partition_list.begin());
+    part_node->clear();
+    SAFE_DELETE(part_node);
+    _data_size -= prune_size;
+    return prune_size;
+}
+
+void ResultNode::clear() {
+    CacheWriteLock write_lock(_node_mtx);
+    LOG(INFO) << "clear result node:" << _sql_key;
+    _sql_key.hi = 0;
+    _sql_key.lo = 0;
+    for (auto it = _partition_list.begin(); it != _partition_list.end();) {
+        (*it)->clear();
+        SAFE_DELETE(*it);
+        it = _partition_list.erase(it);
+    }
+    _data_size = 0;
+}
+
+void ResultNode::append(ResultNode* tail) {
+    _prev = tail;
+    if (tail) tail->set_next(this);
+}
+
+void ResultNode::unlink() {
+    if (_next) {
+        _next->set_prev(_prev);
+    }
+    if (_prev) {
+        _prev->set_next(_next);
+    }
+    _next = NULL;
+    _prev = NULL;
+}
+
+} // namespace doris
+
diff --git a/be/src/runtime/cache/result_node.h 
b/be/src/runtime/cache/result_node.h
new file mode 100644
index 0000000..722509d
--- /dev/null
+++ b/be/src/runtime/cache/result_node.h
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_RUNTIME_RESULT_NODE_H
+#define DORIS_BE_SRC_RUNTIME_RESULT_NODE_H
+
+#include <sys/time.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cstdio>
+#include <cstdlib>
+#include <exception>
+#include <iostream>
+#include <list>
+#include <map>
+#include <string>
+
+#include "common/config.h"
+#include "olap/olap_define.h"
+#include "runtime/cache/cache_utils.h"
+#include "runtime/mem_pool.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple_row.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+enum PCacheStatus;
+class PCacheParam;
+class PCacheValue;
+class PCacheResponse;
+class PFetchCacheRequest;
+class PFetchCacheResult;
+class PUpdateCacheRequest;
+class PClearCacheRequest;
+
+/**
+* Cache one partition data, request param must match version and time of cache
+*/
+class PartitionRowBatch {
+public:
+    PartitionRowBatch(int64 partition_key)
+            : _partition_key(partition_key), _cache_value(NULL), _data_size(0) 
{}
+
+    ~PartitionRowBatch() {}
+
+    void set_row_batch(const PCacheValue& value);
+    bool is_hit_cache(const PCacheParam& param);
+    void clear();
+
+    int64 get_partition_key() const { return _partition_key; }
+
+    PCacheValue* get_value() { return _cache_value; }
+
+    size_t get_data_size() { return _data_size; }
+
+    const CacheStat* get_stat() const { return &_cache_stat; }
+
+private:
+    bool check_match(const PCacheParam& req_param) {
+        if (req_param.partition_key() != _partition_key) {
+            return false;
+        }
+        if (req_param.last_version() > _cache_value->param().last_version()) {
+            return false;
+        }
+        if (req_param.last_version_time() > 
_cache_value->param().last_version_time()) {
+            return false;
+        }
+        return true;
+    }
+
+    bool check_newer(const PCacheParam& up_param) {
+        //for init data of sql cache
+        if (up_param.last_version() == 0 || up_param.last_version_time() == 0) 
{
+            return true;
+        }
+        if (up_param.last_version_time() > 
_cache_value->param().last_version_time()) {
+            return true;
+        }
+        if (up_param.last_version() > _cache_value->param().last_version()) {
+            return true;
+        }
+        return false;
+    }
+
+private:
+    int64 _partition_key;
+    PCacheValue* _cache_value;
+    size_t _data_size;
+    CacheStat _cache_stat;
+};
+
+typedef int64 PartitionKey;
+typedef std::list<PartitionRowBatch*> PartitionRowBatchList;
+typedef boost::unordered_map<PartitionKey, PartitionRowBatch*> 
PartitionRowBatchMap;
+
+/**
+* Cache the result of one SQL, include many partition rowsets.
+* Sql Cache: The partiton ID comes from the partition lastest updated.
+* Partition Cache: The partition ID comes from the partition scanned by query.
+* The above two modes use the same cache structure.
+*/
+class ResultNode {
+public:
+    ResultNode() : _sql_key(0, 0), _prev(NULL), _next(NULL), _data_size(0) {}
+
+    ResultNode(const UniqueId& sql_key)
+            : _sql_key(sql_key), _prev(NULL), _next(NULL), _data_size(0) {}
+
+    virtual ~ResultNode() {}
+
+    PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& 
is_update_firstkey);
+    PCacheStatus fetch_partition(const PFetchCacheRequest* request,
+                                 PartitionRowBatchList& rowBatchList, bool& 
is_hit_firstkey);
+
+    size_t prune_first();
+    void clear();
+
+    ResultNode* get_prev() { return _prev; }
+
+    void set_prev(ResultNode* prev) { _prev = prev; }
+
+    ResultNode* get_next() { return _next; }
+
+    void set_next(ResultNode* next) { _next = next; }
+
+    void append(ResultNode* tail);
+
+    void unlink();
+
+    size_t get_partition_count() const { return _partition_list.size(); }
+
+    size_t get_data_size() const { return _data_size; }
+
+    UniqueId get_sql_key() { return _sql_key; }
+
+    bool sql_key_null() { return _sql_key.hi == 0 && _sql_key.lo == 0; }
+
+    void set_sql_key(const UniqueId& sql_key) { _sql_key = sql_key; }
+
+    long first_partition_last_time() const {
+        if (_partition_list.size() == 0) {
+            return 0;
+        }
+        const PartitionRowBatch* first = *(_partition_list.begin());
+        return first->get_stat()->last_read_time;
+    }
+
+    const CacheStat* get_first_stat() const {
+        if (_partition_list.size() == 0) {
+            return NULL;
+        }
+        return (*(_partition_list.begin()))->get_stat();
+    }
+
+    const CacheStat* get_last_stat() const {
+        if (_partition_list.size() == 0) {
+            return NULL;
+        }
+        return (*(_partition_list.end()--))->get_stat();
+    }
+
+private:
+    mutable boost::shared_mutex _node_mtx;
+    UniqueId _sql_key;
+    ResultNode* _prev;
+    ResultNode* _next;
+    size_t _data_size;
+    PartitionRowBatchList _partition_list;
+    PartitionRowBatchMap _partition_map;
+};
+
+} // namespace doris
+#endif
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index d22150d..6bd8a80 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -21,7 +21,8 @@
 
 namespace doris {
 
-ExecEnv::ExecEnv() {}
+ExecEnv::ExecEnv() : _is_init(false) {
+}
 
 ExecEnv::~ExecEnv() {}
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index a880419..1f97348 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -34,6 +34,7 @@ class EtlJobMgr;
 class EvHttpServer;
 class ExternalScanContextMgr;
 class FragmentMgr;
+class ResultCache;
 class LoadPathMgr;
 class LoadStreamMgr;
 class MemTracker;
@@ -55,6 +56,7 @@ class SmallFileMgr;
 class FileBlockManager;
 class PluginMgr;
 
+
 class BackendServiceClient;
 class FrontendServiceClient;
 class TPaloBrokerServiceClient;
@@ -113,6 +115,7 @@ public:
     PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
     CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
+    ResultCache* result_cache() { return _result_cache; }
     TMasterInfo* master_info() { return _master_info; }
     EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
     LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
@@ -147,6 +150,7 @@ private:
     void _init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t 
clean_pages_limit);
 
 private:
+    bool _is_init;
     std::vector<StorePath> _store_paths;
     // Leave protected so that subclasses can override
     ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
@@ -164,6 +168,7 @@ private:
     PriorityThreadPool* _etl_thread_pool = nullptr;
     CgroupsMgr* _cgroups_mgr = nullptr;
     FragmentMgr* _fragment_mgr = nullptr;
+    ResultCache* _result_cache = nullptr;
     TMasterInfo* _master_info = nullptr;
     EtlJobMgr* _etl_job_mgr = nullptr;
     LoadPathMgr* _load_path_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index fb84bb1..36bac14 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -35,6 +35,7 @@
 #include "runtime/load_channel_mgr.h"
 #include "runtime/tmp_file_mgr.h"
 #include "runtime/bufferpool/reservation_tracker.h"
+#include "runtime/cache/result_cache.h"
 #include "util/metrics.h"
 #include "util/network_util.h"
 #include "util/parse_util.h"
@@ -65,11 +66,15 @@
 
 namespace doris {
 
-Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
+Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) 
{    
     return env->_init(store_paths);
 }
 
 Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
+    //Only init once before be destroyed
+    if (_is_init) {
+        return Status::OK();
+    }
     _store_paths = store_paths;
     _external_scan_context_mgr = new ExternalScanContextMgr(this);
     _stream_mgr = new DataStreamMgr();
@@ -89,6 +94,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
         config::etl_thread_pool_queue_size);
     _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
     _fragment_mgr = new FragmentMgr(this);
+    _result_cache = new ResultCache(config::query_cache_max_size_mb, 
config::query_cache_elasticity_size_mb);
     _master_info = new TMasterInfo();
     _etl_job_mgr = new EtlJobMgr(this);
     _load_path_mgr = new LoadPathMgr(this);
@@ -122,6 +128,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
 
     RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit()));
     _heartbeat_flags = new HeartbeatFlags();
+    _is_init = true;
     return Status::OK();
 }
 
@@ -207,6 +214,10 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size,
 }
 
 void ExecEnv::_destory() {
+    //Only destroy once after init
+    if (!_is_init) {
+        return;
+    }
     SAFE_DELETE(_brpc_stub_cache);
     SAFE_DELETE(_load_stream_mgr);
     SAFE_DELETE(_load_channel_mgr);
@@ -234,6 +245,7 @@ void ExecEnv::_destory() {
     SAFE_DELETE(_routine_load_task_executor);
     SAFE_DELETE(_external_scan_context_mgr);
     SAFE_DELETE(_heartbeat_flags);
+    _is_init = false;
 }
 
 void ExecEnv::destroy(ExecEnv* env) {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 68dfdfd..99395a3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -19,6 +19,7 @@
 
 #include "common/config.h"
 #include "gen_cpp/BackendService.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "runtime/exec_env.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/fragment_mgr.h"
@@ -222,6 +223,32 @@ void PInternalServiceImpl<T>::get_info(
     Status::OK().to_protobuf(response->mutable_status());
 }
 
+template<typename T>
+void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* 
controller,
+        const PUpdateCacheRequest* request,
+        PCacheResponse* response,
+        google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    _exec_env->result_cache()->update(request, response);
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* 
controller,
+        const PFetchCacheRequest* request,
+        PFetchCacheResult* result,
+        google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+   _exec_env->result_cache()->fetch(request, result);
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* 
controller,
+        const PClearCacheRequest* request,
+        PCacheResponse* response,
+        google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    _exec_env->result_cache()->clear(request, response);
+}
 
 template class PInternalServiceImpl<PBackendService>;
 template class PInternalServiceImpl<palo::PInternalService>;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index e27c2c3..03fbcfe 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -21,6 +21,7 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "gen_cpp/palo_internal_service.pb.h"
 #include "util/priority_thread_pool.hpp"
+#include "runtime/cache/result_cache.h"
 
 namespace brpc {
 class Controller;
@@ -86,6 +87,20 @@ public:
         PProxyResult* response,
         google::protobuf::Closure* done) override;
 
+    void update_cache(google::protobuf::RpcController* controller,
+        const PUpdateCacheRequest* request,
+        PCacheResponse* response,
+        google::protobuf::Closure* done) override;    
+
+    void fetch_cache(google::protobuf::RpcController* controller,
+        const PFetchCacheRequest* request,
+        PFetchCacheResult* result,
+        google::protobuf::Closure* done) override;
+
+    void clear_cache(google::protobuf::RpcController* controller,
+        const PClearCacheRequest* request,
+        PCacheResponse* response,
+        google::protobuf::Closure* done) override;
 private:
     Status _exec_plan_fragment(brpc::Controller* cntl);
 private:
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 023074a..39ec078 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -19,7 +19,6 @@
 
 #include <sys/types.h>
 #include <unistd.h>
-
 #include "env/env.h"
 #include "util/debug_util.h"
 #include "util/file_utils.h"
@@ -126,6 +125,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(disk_sync_total, 
MetricUnit::OPERATIONS);
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_reading, MetricUnit::BLOCKS);
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_writing, MetricUnit::BLOCKS);
+    
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_memory_total_byte, 
MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_sql_total_count, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_partition_total_count, 
MetricUnit::NOUNIT);
 
 const std::string DorisMetrics::_s_registry_name = "doris_be";
 const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -230,6 +233,10 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
 
     _server_metric_entity->register_hook(_s_hook_name, 
std::bind(&DorisMetrics::_update, this));
+
+    INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, 
query_cache_memory_total_byte);
+    INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, 
query_cache_sql_total_count);
+    INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, 
query_cache_partition_total_count);
 }
 
 void DorisMetrics::initialize(
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 3ffbe74..924f365 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -170,6 +170,11 @@ public:
     UIntGauge* tablet_writer_count;
 
     UIntGauge* compaction_mem_current_consumption;
+    
+    // Cache metrics
+    UIntGauge* query_cache_memory_total_byte;
+    UIntGauge* query_cache_sql_total_count;
+    UIntGauge* query_cache_partition_total_count;
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;
diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h
index dc2d938..c5c0859 100644
--- a/be/src/util/uid_util.h
+++ b/be/src/util/uid_util.h
@@ -62,6 +62,7 @@ struct UniqueId {
     int64_t lo = 0;
 
     UniqueId(int64_t hi_, int64_t lo_) : hi(hi_), lo(lo_) { }
+    UniqueId(const UniqueId& uid) : hi(uid.hi), lo(uid.lo) { }
     UniqueId(const TUniqueId& tuid) : hi(tuid.hi), lo(tuid.lo) { }
     UniqueId(const PUniqueId& puid) : hi(puid.hi()), lo(puid.lo()) { }
     UniqueId(const std::string& hi_str, const std::string& lo_str) {
@@ -87,6 +88,32 @@ struct UniqueId {
         to_hex(lo, buf + 17);
         return {buf, 33};
     }
+    
+    UniqueId& operator=(const UniqueId uid) {
+        hi = uid.hi;
+        lo = uid.lo;
+        return *this; 
+    }
+
+    UniqueId& operator=(const PUniqueId puid) {
+        hi = puid.hi();
+        lo = puid.lo();    
+        return *this;
+    }
+    
+    UniqueId& operator=(const TUniqueId tuid) {
+        hi = tuid.hi;
+        lo = tuid.lo;
+        return *this;
+    }
+    //compare PUniqueId and UniqueId  
+    bool operator==(const PUniqueId& rhs) const {
+        return hi == rhs.hi() && lo == rhs.lo();
+    }
+    
+    bool operator!=(const PUniqueId& rhs) const {
+        return hi != rhs.hi() || lo != rhs.lo();
+    }
 
     // std::map std::set needs this operator
     bool operator<(const UniqueId& right) const {
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index b360534..c3bab67 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -62,3 +62,4 @@ ADD_BE_TEST(external_scan_context_mgr_test)
 
 ADD_BE_TEST(memory/chunk_allocator_test)
 ADD_BE_TEST(memory/system_allocator_test)
+ADD_BE_TEST(cache/partition_cache_test)
diff --git a/be/test/runtime/cache/partition_cache_test.cpp 
b/be/test/runtime/cache/partition_cache_test.cpp
new file mode 100644
index 0000000..e370a78
--- /dev/null
+++ b/be/test/runtime/cache/partition_cache_test.cpp
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <boost/shared_ptr.hpp>
+#include "util/logging.h"
+#include "util/cpu_info.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "runtime/cache/result_cache.h"
+#include "runtime/buffer_control_block.h"
+
+namespace doris {
+
+class PartitionCacheTest : public testing::Test {
+public:
+    PartitionCacheTest() {
+
+    }
+    virtual ~PartitionCacheTest() {
+//        clear();
+    }
+protected:
+    virtual void SetUp() {
+    }
+
+private:
+    void init_default(){
+        LOG(WARNING) << "init test default\n";
+        init(16,4);
+    }
+    void init(int max_size, int ela_size);
+    void clear();
+    PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num);
+    ResultCache* _cache;
+    PUpdateCacheRequest* _update_request;
+    PCacheResponse* _update_response;
+    PFetchCacheRequest* _fetch_request;
+    PFetchCacheResult* _fetch_result;
+    PClearCacheRequest* _clear_request;
+    PCacheResponse* _clear_response;
+};
+
+void PartitionCacheTest::init(int max_size, int ela_size){
+    LOG(WARNING) << "init test\n";
+    _cache = new ResultCache(max_size, ela_size);
+    _update_request = new PUpdateCacheRequest();
+    _update_response = new PCacheResponse();
+    _fetch_request = new PFetchCacheRequest();
+    _fetch_result = new PFetchCacheResult();
+    _clear_request = new PClearCacheRequest();
+    _clear_response = new PCacheResponse();
+}
+
+void PartitionCacheTest::clear(){
+    _clear_request->set_clear_type(PClearType::CLEAR_ALL);
+    _cache->clear(_clear_request, _clear_response);    
+    SAFE_DELETE(_cache);
+    SAFE_DELETE(_update_request);
+    SAFE_DELETE(_update_response);
+    SAFE_DELETE(_fetch_request);
+    SAFE_DELETE(_fetch_result);
+    SAFE_DELETE(_clear_request);
+    SAFE_DELETE(_clear_response);
+}
+
+void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo){
+    sql_key->set_hi(hi);
+    sql_key->set_lo(lo);
+}
+
+PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, 
int part_num) {
+    LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" << 
part_num;
+    PUpdateCacheRequest* up_req = NULL;
+    PCacheResponse* up_res = NULL;
+    PCacheStatus st = PCacheStatus::DEFAULT;
+    for (int i = 1; i < sql_num + 1; i++) {
+        LOG(WARNING) << "Sql:" << i;
+        up_req = new PUpdateCacheRequest();
+        up_res = new PCacheResponse();
+        set_sql_key(up_req->mutable_sql_key(), i, i);
+        //partition
+        for (int j = part_begin; j < part_begin + part_num; j++) {
+            PCacheValue* value = up_req->add_values();
+            value->mutable_param()->set_partition_key(j);
+            value->mutable_param()->set_last_version(j);
+            value->mutable_param()->set_last_version_time(j);
+            value->set_data_size(16);
+            value->add_rows("0123456789abcdef"); //16 byte
+        }
+        _cache->update(up_req, up_res);
+        LOG(WARNING) << "finish update data";
+        st = up_res->status();
+        SAFE_DELETE(up_req);
+        SAFE_DELETE(up_res);
+    }
+    return st;
+}
+
+TEST_F(PartitionCacheTest, update_data) {
+    init_default();
+    PCacheStatus st = init_batch_data(1, 1, 1);
+    ASSERT_TRUE(st == PCacheStatus::CACHE_OK);
+    LOG(WARNING) << "clear cache";
+    clear();
+}
+
+TEST_F(PartitionCacheTest, update_over_partition) {
+    init_default();
+    PCacheStatus st = init_batch_data(1, 1, 
config::query_cache_max_partition_count+1);
+    ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR);
+    clear();
+}
+
+TEST_F(PartitionCacheTest, cache_clear) {
+    init_default();
+    init_batch_data(1, 1, 1);
+    _cache->clear(_clear_request, _clear_response);
+    ASSERT_EQ(_cache->get_cache_size(),0); 
+    clear();
+}
+
+TEST_F(PartitionCacheTest, fetch_simple_data) {
+    init_default();
+    init_batch_data(1, 1, 1);
+
+    LOG(WARNING) << "finish init\n";
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    p1->set_last_version(1);
+    p1->set_last_version_time(1);
+    LOG(WARNING) << "begin fetch\n";
+    _cache->fetch(_fetch_request, _fetch_result);
+    LOG(WARNING) << "finish fetch1\n";
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK);
+    ASSERT_EQ(_fetch_result->values_size(), 1);
+    ASSERT_EQ(_fetch_result->values(0).rows(0), "0123456789abcdef");
+
+    LOG(WARNING) << "finish fetch2\n";
+    clear();
+    LOG(WARNING) << "finish fetch3\n";
+}
+
+TEST_F(PartitionCacheTest, fetch_not_sqlid) {
+    init_default();
+    init_batch_data(1, 1, 1);
+
+    set_sql_key(_fetch_request->mutable_sql_key(), 2, 2);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    p1->set_last_version(1);
+    p1->set_last_version_time(1);
+    _cache->fetch(_fetch_request, _fetch_result);
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_SQL_KEY);
+
+    clear();
+}
+
+TEST_F(PartitionCacheTest, fetch_range_data) {
+    init_default();
+    init_batch_data(1, 1, 3);
+
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(2);
+    p1->set_last_version(2);
+    p1->set_last_version_time(2);
+    PCacheParam* p2 = _fetch_request->add_params();
+    p2->set_partition_key(3);
+    p2->set_last_version(3);
+    p2->set_last_version_time(3);
+    _cache->fetch(_fetch_request, _fetch_result);
+
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK);
+    ASSERT_EQ(_fetch_result->values_size(), 2);
+
+    clear();
+}
+
+TEST_F(PartitionCacheTest, fetch_invalid_key_range) {
+    init_default();
+    init_batch_data(1, 2, 1);
+
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    p1->set_last_version(1);
+    p1->set_last_version_time(1);
+    
+    PCacheParam* p2 = _fetch_request->add_params();
+    p2->set_partition_key(2);
+    p2->set_last_version(2);
+    p2->set_last_version_time(2);
+    
+    PCacheParam* p3 = _fetch_request->add_params();
+    p3->set_partition_key(3);
+    p3->set_last_version(3);
+    p3->set_last_version_time(3);
+    _cache->fetch(_fetch_request, _fetch_result);
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::INVALID_KEY_RANGE);
+    ASSERT_EQ(_fetch_result->values_size(), 0);
+    clear();
+}
+
+TEST_F(PartitionCacheTest, fetch_data_overdue) {
+    init_default();
+    init_batch_data(1, 1, 1);
+
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    //cache version is 1, request version is 2
+    p1->set_last_version(2);
+    p1->set_last_version_time(2);
+    _cache->fetch(_fetch_request, _fetch_result);
+
+    LOG(WARNING) << "fetch_data_overdue:" << _fetch_result->status();
+
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::DATA_OVERDUE);    
+    ASSERT_EQ(_fetch_result->values_size(), 0);
+    
+    clear();
+}
+
+TEST_F(PartitionCacheTest, prune_data) {
+    init(1,1);
+    init_batch_data(129, 1, 1024);                    // 16*1024*128=2M
+    ASSERT_LE(_cache->get_cache_size(), 1*1024*1024);   //cache_size <= 1M
+    clear();
+}
+
+}
+
+int main(int argc, char** argv) {
+    std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+    if (!doris::config::init(conffile.c_str(), false)) {
+        fprintf(stderr, "error read config file. \n");
+        return -1;
+    }
+     doris::init_glog("be-test");
+    ::testing::InitGoogleTest(&argc, argv);
+    doris::CpuInfo::init();
+    return RUN_ALL_TESTS();
+}
+/* vim: set ts=4 sw=4 sts=4 tw=100 */
diff --git a/docs/zh-CN/administrator-guide/partition_cache.md 
b/docs/zh-CN/administrator-guide/partition_cache.md
new file mode 100644
index 0000000..a14ac5a
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/partition_cache.md
@@ -0,0 +1,197 @@
+# 分区缓存
+
+## 需求场景
+大部分数据分析场景是写少读多,数据写入一次,多次频繁读取,比如一张报表涉及的维度和指标,数据在凌晨一次性计算好,但每天有数百甚至数千次的页面访问,因此非常适合把结果集缓存起来。在数据分析或BI应用中,存在下面的业务场景:
+* **高并发场景**,Doris可以较好的支持高并发,但单台服务器无法承载太高的QPS
+* **复杂图表的看板**,复杂的Dashboard或者大屏类应用,数据来自多张表,每个页面有数十个查询,虽然每个查询只有数十毫秒,但是总体查询时间会在数秒
+* **趋势分析**,给定日期范围的查询,指标按日显示,比如查询最近7天内的用户数的趋势,这类查询数据量大,查询范围广,查询时间往往需要数十秒
+* **用户重复查询**,如果产品没有防重刷机制,用户因手误或其他原因重复刷新页面,导致提交大量的重复的SQL
+
+以上四种场景,在应用层的解决方案,把查询结果放到Redis中,周期性的更新缓存或者用户手工刷新缓存,但是这个方案有如下问题:
+* **数据不一致**,无法感知数据的更新,导致用户经常看到旧的数据
+* **命中率低**,缓存整个查询结果,如果数据实时写入,缓存频繁失效,命中率低且系统负载较重
+* **额外成本**,引入外部缓存组件,会带来系统复杂度,增加额外成本
+
+## 解决方案
+本分区缓存策略可以解决上面的问题,优先保证数据一致性,在此基础上细化缓存粒度,提升命中率,因此有如下特点:
+* 用户无需担心数据一致性,通过版本来控制缓存失效,缓存的数据和从BE中查询的数据是一致的
+* 没有额外的组件和成本,缓存结果存储在BE的内存中,用户可以根据需要调整缓存内存大小
+* 实现了两种缓存策略,SQLCache和PartitionCache,后者缓存粒度更细
+* 用一致性哈希解决BE节点上下线的问题,BE中的缓存算法是改进的LRU
+
+## SQLCache
+SQLCache按SQL的签名、查询的表的分区ID、分区最新版本来存储和获取缓存。三者组合确定一个缓存数据集,任何一个变化了,如SQL有变化,如查询字段或条件不一样,或数据更新后版本变化了,会导致命中不了缓存。
+
+如果多张表Join,使用最近更新的分区ID和最新的版本号,如果其中一张表更新了,会导致分区ID或版本号不一样,也一样命中不了缓存。
+
+SQLCache,更适合T+1更新的场景,凌晨数据更新,首次查询从BE中获取结果放入到缓存中,后续相同查询从缓存中获取。实时更新数据也可以使用,但是可能存在命中率低的问题,可以参考如下PartitionCache。
+
+## PartitionCache
+
+### 设计原理
+1. SQL可以并行拆分,Q = Q1 ∪ Q2 ... ∪ Qn,R= R1 ∪ R2 ... ∪ Rn,Q为查询语句,R为结果集
+2. 拆分为只读分区和可更新分区,只读分区缓存,更新分区不缓存
+
+如上,查询最近7天的每天用户数,如按日期分区,数据只写当天分区,当天之外的其他分区的数据,都是固定不变的,在相同的查询SQL下,查询某个不更新分区的指标都是固定的。如下,在2020-03-09当天查询前7天的用户数,2020-03-03至2020-03-07的数据来自缓存,2020-03-08第一次查询来自分区,后续的查询来自缓存,2020-03-09因为当天在不停写入,所以来自分区。
+
+因此,查询N天的数据,数据更新最近的D天,每天只是日期范围不一样相似的查询,只需要查询D个分区即可,其他部分都来自缓存,可以有效降低集群负载,减少查询时间。
+
+```
+MySQL [(none)]> SELECT eventdate,count(userid) FROM testdb.appevent WHERE 
eventdate>="2020-03-03" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY 
eventdate;
++------------+-----------------+
+| eventdate  | count(`userid`) |
++------------+-----------------+
+| 2020-03-03 |              15 |
+| 2020-03-04 |              20 |
+| 2020-03-05 |              25 |
+| 2020-03-06 |              30 |
+| 2020-03-07 |              35 |
+| 2020-03-08 |              40 | //第一次来自分区,后续来自缓存
+| 2020-03-09 |              25 | //来自分区
++------------+-----------------+
+7 rows in set (0.02 sec)
+```
+
+在PartitionCache中,缓存第一级Key是去掉了分区条件后的SQL的128位MD5签名,下面是改写后的待签名的SQL:
+```
+SELECT eventdate,count(userid) FROM testdb.appevent GROUP BY eventdate ORDER 
BY eventdate;
+```
+缓存的第二级Key是查询结果集的分区字段的内容,比如上面查询结果的eventdate列的内容,二级Key的附属信息是分区的版本号和版本更新时间。
+
+下面演示上面SQL在2020-03-09当天第一次执行的流程:
+1. 从缓存中获取数据
+```
++------------+-----------------+
+| 2020-03-03 |              15 |
+| 2020-03-04 |              20 |
+| 2020-03-05 |              25 |
+| 2020-03-06 |              30 |
+| 2020-03-07 |              35 |
++------------+-----------------+
+```
+2. 从BE中获取数据的SQL和数据
+```
+SELECT eventdate,count(userid) FROM testdb.appevent WHERE 
eventdate>="2020-03-08" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY 
eventdate;
+
++------------+-----------------+
+| 2020-03-08 |              40 |
++------------+-----------------+
+| 2020-03-09 |              25 | 
++------------+-----------------+
+```
+3. 最后发送给终端的数据
+```
++------------+-----------------+
+| eventdate  | count(`userid`) |
++------------+-----------------+
+| 2020-03-03 |              15 |
+| 2020-03-04 |              20 |
+| 2020-03-05 |              25 |
+| 2020-03-06 |              30 |
+| 2020-03-07 |              35 |
+| 2020-03-08 |              40 |
+| 2020-03-09 |              25 |
++------------+-----------------+
+```
+4. 发送给缓存的数据
+```
++------------+-----------------+
+| 2020-03-08 |              40 |
++------------+-----------------+
+```
+
+Partition缓存,适合按日期分区,部分分区实时更新,查询SQL较为固定。
+
+分区字段也可以是其他字段,但是需要保证只有少量分区更新。
+
+### 一些限制
+* 只支持OlapTable,其他存储如MySQL的表没有版本信息,无法感知数据是否更新
+* 只支持按分区字段分组,不支持按其他字段分组,按其他字段分组,该分组数据都有可能被更新,会导致缓存都失效
+* 只支持结果集的前半部分、后半部分以及全部命中缓存,不支持结果集被缓存数据分割成几个部分
+
+## 使用方式
+### 开启SQLCache
+确保fe.conf的cache_enable_sql_mode=true(默认是true)
+```
+vim fe/conf/fe.conf
+cache_enable_sql_mode=true
+```
+在MySQL命令行中设置变量
+```
+MySQL [(none)]> set [global] enable_sql_cache=true;
+```
+注:global是全局变量,不加指当前会话变量
+
+### 开启PartitionCache
+确保fe.conf的cache_enable_partition_mode=true(默认是true)
+```
+vim fe/conf/fe.conf
+cache_enable_partition_mode=true
+```
+在MySQL命令行中设置变量
+```
+MySQL [(none)]> set [global] enable_partition_cache=true;
+```
+
+如果同时开启了两个缓存策略,下面的参数,需要注意一下:
+```
+cache_last_version_interval_second=900
+```
+如果分区的最新版本的时间离现在的间隔,大于cache_last_version_interval_second,则会优先把整个查询结果缓存。如果小于这个间隔,如果符合PartitionCache的条件,则按PartitionCache数据。
+
+### 监控
+FE的监控项:
+```
+query_table            //Query中有表的数量
+query_olap_table       //Query中有Olap表的数量
+cache_mode_sql         //识别缓存模式为sql的Query数量
+cache_hit_sql          //模式为sql的Query命中Cache的数量
+query_mode_partition   //识别缓存模式为Partition的Query数量
+cache_hit_partition    //通过Partition命中的Query数量
+partition_all          //Query中扫描的所有分区
+partition_hit          //通过Cache命中的分区数量
+
+Cache命中率     = (cache_hit_sql + cache_hit_partition) / query_olap_table
+Partition命中率 = partition_hit / partition_all
+```
+
+BE的监控项:
+```
+query_cache_memory_total_byte       //Cache内存大小
+query_query_cache_sql_total_count   //Cache的SQL的数量
+query_cache_partition_total_count   //Cache分区数量
+
+SQL平均数据大小       = cache_memory_total / cache_sql_total
+Partition平均数据大小 = cache_memory_total / cache_partition_total
+```
+
+其他监控:
+可以从Grafana中查看BE节点的CPU和内存指标,Query统计中的Query Percentile等指标,配合Cache参数的调整来达成业务目标。
+
+
+### 优化参数
+FE的配置项cache_result_max_row_count,查询结果集放入缓存的最大行数,可以根据实际情况调整,但建议不要设置过大,避免过多占用内存,超过这个大小的结果集不会被缓存。
+```
+vim fe/conf/fe.conf
+cache_result_max_row_count=3000
+```
+
+BE最大分区数量cache_max_partition_count,指每个SQL对应的最大分区数,如果是按日期分区,能缓存2年多的数据,假如想保留更长时间的缓存,请把这个参数设置得更大,同时修改cache_result_max_row_count的参数。
+```
+vim be/conf/be.conf
+cache_max_partition_count=1024
+```
+
+BE中缓存内存设置,有两个参数query_cache_max_size和query_cache_elasticity_size两部分组成(单位MB),内存超过query_cache_max_size
 + 
cache_elasticity_size会开始清理,并把内存控制到query_cache_max_size以下。可以根据BE节点数量,节点内存大小,和缓存命中率来设置这两个参数。
+```
+query_cache_max_size_mb=256
+query_cache_elasticity_size_mb=128
+```
+计算方法:
+
+假如缓存10K个Query,每个Query缓存1000行,每行是128个字节,分布在10台BE上,则每个BE需要128M内存(10K*1000*128/10)。
+
+## 未尽事项
+* T+1的数据,是否也可以用Partition缓存? 目前不支持
+* 类似的SQL,之前查询了2个指标,现在查询3个指标,是否可以利用2个指标的缓存? 目前不支持
+* 按日期分区,但是需要按周维度汇总数据,是否可用PartitionCache? 目前不支持


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to