This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5012ddd87a2a671bc2aed465732adc2def929993
Author: 924060929 <924060...@qq.com>
AuthorDate: Mon May 13 12:05:22 2024 +0800

    [fix](Nereids) fix sql cache return old value when truncate partition 
(#34698)
    
    1. fix sql cache return old value when truncate partition
    2. use expire_sql_cache_in_fe_second to control the expire time of the sql 
cache which in the NereidsSqlCacheManager
---
 .../main/java/org/apache/doris/common/Config.java  |  8 ++++-
 .../main/java/org/apache/doris/catalog/Env.java    |  4 +--
 .../doris/common/NereidsSqlCacheManager.java       | 38 +++++++++++-----------
 .../org/apache/doris/qe/cache/CacheAnalyzer.java   |  2 +-
 .../cache/parse_sql_from_sql_cache.groovy          | 38 +++++++++++++++++++++-
 5 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6fc20578ec0..d44ff4a0dd4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1306,12 +1306,18 @@ public class Config extends ConfigBase {
      *  Minimum interval between last version when caching results,
      *  This parameter distinguishes between offline and real-time updates
      */
+    @ConfField(mutable = true, masterOnly = false)
+    public static int cache_last_version_interval_second = 30;
+
+    /**
+     *  Expire sql sql in frontend time
+     */
     @ConfField(
             mutable = true,
             masterOnly = false,
             callbackClassString = 
"org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
     )
-    public static int cache_last_version_interval_second = 30;
+    public static int expire_sql_cache_in_fe_second = 300;
 
     /**
      * Set the maximum number of rows that can be cached
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 4839769e0f8..d27ae147323 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -771,9 +771,7 @@ public class Env {
         this.mtmvService = new MTMVService();
         this.insertOverwriteManager = new InsertOverwriteManager();
         this.dnsCache = new DNSCache();
-        this.sqlCacheManager = new NereidsSqlCacheManager(
-                Config.sql_cache_manage_num, 
Config.cache_last_version_interval_second
-        );
+        this.sqlCacheManager = new NereidsSqlCacheManager();
     }
 
     public static void destroyCheckpoint() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
index 8989375c07f..cf6280650f0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
@@ -74,8 +74,11 @@ public class NereidsSqlCacheManager {
     // value: SqlCacheContext
     private volatile Cache<String, SqlCacheContext> sqlCaches;
 
-    public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) {
-        sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds);
+    public NereidsSqlCacheManager() {
+        sqlCaches = buildSqlCaches(
+                Config.sql_cache_manage_num,
+                Config.expire_sql_cache_in_fe_second
+        );
     }
 
     public static synchronized void updateConfig() {
@@ -90,22 +93,24 @@ public class NereidsSqlCacheManager {
 
         Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches(
                 Config.sql_cache_manage_num,
-                Config.cache_last_version_interval_second
+                Config.expire_sql_cache_in_fe_second
         );
         sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap());
         sqlCacheManager.sqlCaches = sqlCaches;
     }
 
-    private static Cache<String, SqlCacheContext> buildSqlCaches(int 
sqlCacheNum, long cacheIntervalSeconds) {
-        sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
-        cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : 
cacheIntervalSeconds;
-
-        return Caffeine.newBuilder()
-                .maximumSize(sqlCacheNum)
-                .expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds))
+    private static Cache<String, SqlCacheContext> buildSqlCaches(int 
sqlCacheNum, long expireAfterAccessSeconds) {
+        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
                 // auto evict cache when jvm memory too low
-                .softValues()
-                .build();
+                .softValues();
+        if (sqlCacheNum > 0) {
+            cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
+        }
+        if (expireAfterAccessSeconds > 0) {
+            cacheBuilder = 
cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
+        }
+
+        return cacheBuilder.build();
     }
 
     /** tryAddFeCache */
@@ -237,9 +242,6 @@ public class NereidsSqlCacheManager {
     }
 
     private boolean tablesOrDataChanged(Env env, SqlCacheContext 
sqlCacheContext) {
-        long latestPartitionTime = sqlCacheContext.getLatestPartitionTime();
-        long latestPartitionVersion = 
sqlCacheContext.getLatestPartitionVersion();
-
         if (sqlCacheContext.hasUnsupportedTables()) {
             return true;
         }
@@ -255,7 +257,7 @@ public class NereidsSqlCacheManager {
             long cacheTableTime = scanTable.latestTimestamp;
             long currentTableVersion = olapTable.getVisibleVersion();
             long cacheTableVersion = scanTable.latestVersion;
-            // some partitions have been dropped, or delete or update or 
insert rows into new partition?
+            // some partitions have been dropped, or delete or updated or 
replaced, or insert rows into new partition?
             if (currentTableTime > cacheTableTime
                     || (currentTableTime == cacheTableTime && 
currentTableVersion > cacheTableVersion)) {
                 return true;
@@ -264,9 +266,7 @@ public class NereidsSqlCacheManager {
             for (Long scanPartitionId : scanTable.getScanPartitions()) {
                 Partition partition = olapTable.getPartition(scanPartitionId);
                 // partition == null: is this partition truncated?
-                if (partition == null || partition.getVisibleVersionTime() > 
latestPartitionTime
-                        || (partition.getVisibleVersionTime() == 
latestPartitionTime
-                        && partition.getVisibleVersion() > 
latestPartitionVersion)) {
+                if (partition == null) {
                     return true;
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index 85f37094b02..47fccfcd37a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -701,11 +701,11 @@ public class CacheAnalyzer {
         scanTables.add(scanTable);
         for (Long partitionId : node.getSelectedPartitionIds()) {
             Partition partition = olapTable.getPartition(partitionId);
+            scanTable.addScanPartition(partitionId);
             if (partition.getVisibleVersionTime() >= 
cacheTable.latestPartitionTime) {
                 cacheTable.latestPartitionId = partition.getId();
                 cacheTable.latestPartitionTime = 
partition.getVisibleVersionTime();
                 cacheTable.latestPartitionVersion = 
partition.getVisibleVersion();
-                scanTable.addScanPartition(partitionId);
             }
         }
         return cacheTable;
diff --git 
a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy 
b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
index b672e30cb62..b75d35f8c51 100644
--- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
@@ -32,7 +32,6 @@ suite("parse_sql_from_sql_cache") {
         }
     }
 
-
     sql  "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = 
'10')"
 
     combineFutures(
@@ -656,6 +655,43 @@ suite("parse_sql_from_sql_cache") {
             assertHasCache "select * from test_use_plan_cache20 where id=999"
             def result6 = sql "select * from test_use_plan_cache20 where 
id=999"
             assertTrue(result6.isEmpty())
+        }),
+        extraThread("test_truncate_partition", {
+            sql "drop table if exists test_use_plan_cache21"
+            sql """create table test_use_plan_cache21 (
+                        id int,
+                        dt int
+                       )
+                       partition by range(dt)
+                       (
+                        partition dt1 values [('1'), ('2')),
+                        partition dt2 values [('2'), ('3'))
+                       )
+                       distributed by hash(id)
+                       properties('replication_num'='1')"""
+
+
+
+            sql "insert into test_use_plan_cache21 values('2', '2')"
+            sleep(100)
+            sql "insert into test_use_plan_cache21 values('1', '1')"
+
+            // after partition changed 10s, the sql cache can be used
+            sleep(10000)
+
+            sql "set enable_nereids_planner=true"
+            sql "set enable_fallback_to_original_planner=false"
+            sql "set enable_sql_cache=true"
+
+            assertNoCache "select * from test_use_plan_cache21"
+            def result1 = sql "select * from test_use_plan_cache21"
+            assertTrue(result1.size() == 2)
+            assertHasCache "select * from test_use_plan_cache21"
+
+            sql "truncate table test_use_plan_cache21 partition dt2"
+            assertNoCache "select * from test_use_plan_cache21"
+            def result2 = sql "select * from test_use_plan_cache21"
+            assertTrue(result2.size() == 1)
         })
     ).get()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to