This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5012ddd87a2a671bc2aed465732adc2def929993 Author: 924060929 <924060...@qq.com> AuthorDate: Mon May 13 12:05:22 2024 +0800 [fix](Nereids) fix sql cache return old value when truncate partition (#34698) 1. fix sql cache return old value when truncate partition 2. use expire_sql_cache_in_fe_second to control the expire time of the sql cache which in the NereidsSqlCacheManager --- .../main/java/org/apache/doris/common/Config.java | 8 ++++- .../main/java/org/apache/doris/catalog/Env.java | 4 +-- .../doris/common/NereidsSqlCacheManager.java | 38 +++++++++++----------- .../org/apache/doris/qe/cache/CacheAnalyzer.java | 2 +- .../cache/parse_sql_from_sql_cache.groovy | 38 +++++++++++++++++++++- 5 files changed, 65 insertions(+), 25 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6fc20578ec0..d44ff4a0dd4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1306,12 +1306,18 @@ public class Config extends ConfigBase { * Minimum interval between last version when caching results, * This parameter distinguishes between offline and real-time updates */ + @ConfField(mutable = true, masterOnly = false) + public static int cache_last_version_interval_second = 30; + + /** + * Expire sql sql in frontend time + */ @ConfField( mutable = true, masterOnly = false, callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig" ) - public static int cache_last_version_interval_second = 30; + public static int expire_sql_cache_in_fe_second = 300; /** * Set the maximum number of rows that can be cached diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 4839769e0f8..d27ae147323 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -771,9 +771,7 @@ public class Env { this.mtmvService = new MTMVService(); this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); - this.sqlCacheManager = new NereidsSqlCacheManager( - Config.sql_cache_manage_num, Config.cache_last_version_interval_second - ); + this.sqlCacheManager = new NereidsSqlCacheManager(); } public static void destroyCheckpoint() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index 8989375c07f..cf6280650f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -74,8 +74,11 @@ public class NereidsSqlCacheManager { // value: SqlCacheContext private volatile Cache<String, SqlCacheContext> sqlCaches; - public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) { - sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds); + public NereidsSqlCacheManager() { + sqlCaches = buildSqlCaches( + Config.sql_cache_manage_num, + Config.expire_sql_cache_in_fe_second + ); } public static synchronized void updateConfig() { @@ -90,22 +93,24 @@ public class NereidsSqlCacheManager { Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches( Config.sql_cache_manage_num, - Config.cache_last_version_interval_second + Config.expire_sql_cache_in_fe_second ); sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap()); sqlCacheManager.sqlCaches = sqlCaches; } - private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) { - sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum; - cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds; - - return Caffeine.newBuilder() - .maximumSize(sqlCacheNum) - .expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds)) + private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long expireAfterAccessSeconds) { + Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder() // auto evict cache when jvm memory too low - .softValues() - .build(); + .softValues(); + if (sqlCacheNum > 0) { + cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum); + } + if (expireAfterAccessSeconds > 0) { + cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds)); + } + + return cacheBuilder.build(); } /** tryAddFeCache */ @@ -237,9 +242,6 @@ public class NereidsSqlCacheManager { } private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) { - long latestPartitionTime = sqlCacheContext.getLatestPartitionTime(); - long latestPartitionVersion = sqlCacheContext.getLatestPartitionVersion(); - if (sqlCacheContext.hasUnsupportedTables()) { return true; } @@ -255,7 +257,7 @@ public class NereidsSqlCacheManager { long cacheTableTime = scanTable.latestTimestamp; long currentTableVersion = olapTable.getVisibleVersion(); long cacheTableVersion = scanTable.latestVersion; - // some partitions have been dropped, or delete or update or insert rows into new partition? + // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? if (currentTableTime > cacheTableTime || (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) { return true; @@ -264,9 +266,7 @@ public class NereidsSqlCacheManager { for (Long scanPartitionId : scanTable.getScanPartitions()) { Partition partition = olapTable.getPartition(scanPartitionId); // partition == null: is this partition truncated? - if (partition == null || partition.getVisibleVersionTime() > latestPartitionTime - || (partition.getVisibleVersionTime() == latestPartitionTime - && partition.getVisibleVersion() > latestPartitionVersion)) { + if (partition == null) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index 85f37094b02..47fccfcd37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -701,11 +701,11 @@ public class CacheAnalyzer { scanTables.add(scanTable); for (Long partitionId : node.getSelectedPartitionIds()) { Partition partition = olapTable.getPartition(partitionId); + scanTable.addScanPartition(partitionId); if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) { cacheTable.latestPartitionId = partition.getId(); cacheTable.latestPartitionTime = partition.getVisibleVersionTime(); cacheTable.latestPartitionVersion = partition.getVisibleVersion(); - scanTable.addScanPartition(partitionId); } } return cacheTable; diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index b672e30cb62..b75d35f8c51 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -32,7 +32,6 @@ suite("parse_sql_from_sql_cache") { } } - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" combineFutures( @@ -656,6 +655,43 @@ suite("parse_sql_from_sql_cache") { assertHasCache "select * from test_use_plan_cache20 where id=999" def result6 = sql "select * from test_use_plan_cache20 where id=999" assertTrue(result6.isEmpty()) + }), + extraThread("test_truncate_partition", { + sql "drop table if exists test_use_plan_cache21" + sql """create table test_use_plan_cache21 ( + id int, + dt int + ) + partition by range(dt) + ( + partition dt1 values [('1'), ('2')), + partition dt2 values [('2'), ('3')) + ) + distributed by hash(id) + properties('replication_num'='1')""" + + + + sql "insert into test_use_plan_cache21 values('2', '2')" + sleep(100) + sql "insert into test_use_plan_cache21 values('1', '1')" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache21" + def result1 = sql "select * from test_use_plan_cache21" + assertTrue(result1.size() == 2) + assertHasCache "select * from test_use_plan_cache21" + + sql "truncate table test_use_plan_cache21 partition dt2" + assertNoCache "select * from test_use_plan_cache21" + def result2 = sql "select * from test_use_plan_cache21" + assertTrue(result2.size() == 1) }) ).get() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org