This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 4b3061c2d9f branch-4.0: (query cache) query cache shouldn't be hitted 
when session variable changed or use udf (#60315) (#60661)
4b3061c2d9f is described below

commit 4b3061c2d9f0145b58de85e51f4533b62f91c465
Author: 924060929 <[email protected]>
AuthorDate: Fri Feb 13 09:36:06 2026 +0800

    branch-4.0: (query cache) query cache shouldn't be hitted when session 
variable changed or use udf (#60315) (#60661)
    
    cherry pick from #60315
---
 .../glue/translator/PhysicalPlanTranslator.java    | 93 +++++++++++++++++++++-
 .../org/apache/doris/planner/AggregationNode.java  | 10 +++
 .../planner/normalize/QueryCacheNormalizer.java    | 35 +++++---
 .../doris/planner/QueryCacheNormalizerTest.java    |  1 +
 .../suites/query_p0/cache/query_cache.groovy       |  2 +
 .../query_p0/cache/query_cache_with_context.groovy | 65 +++++++++++++++
 6 files changed, 194 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f150c7f3bf4..0a98be0281b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -39,6 +39,7 @@ import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TableSample;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.AliasFunction;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Function.NullableMode;
@@ -105,9 +106,11 @@ import 
org.apache.doris.nereids.trees.expressions.SessionVarGuardExpr;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.WindowFrame;
+import org.apache.doris.nereids.trees.expressions.functions.Udf;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.GroupingScalarFunction;
+import 
org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction;
 import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.AggMode;
 import org.apache.doris.nereids.trees.plans.AggPhase;
@@ -249,7 +252,9 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -506,7 +511,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         List<Expr> partitionExprs = 
olapTableSink.getPartitionExprList().stream()
                 .map(e -> ExpressionTranslator.translate(e, 
context)).collect(Collectors.toList());
         Map<Long, Expr> syncMvWhereClauses = new HashMap<>();
-        for (Map.Entry<Long, Expression> entry : 
olapTableSink.getSyncMvWhereClauses().entrySet()) {
+        for (Entry<Long, Expression> entry : 
olapTableSink.getSyncMvWhereClauses().entrySet()) {
             syncMvWhereClauses.put(entry.getKey(), 
ExpressionTranslator.translate(entry.getValue(), context));
         }
         OlapTableSink sink;
@@ -1270,6 +1275,11 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             aggregationNode.setCardinality((long) 
aggregate.getStats().getRowCount());
         }
         updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), 
aggregate);
+
+        if (ConnectContext.get().getSessionVariable().getEnableQueryCache()) {
+            setQueryCacheCandidate(aggregate, aggregationNode);
+        }
+
         return inputPlanFragment;
     }
 
@@ -2420,7 +2430,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 context.getTopnFilterContext().translateSource(topN, sortNode);
                 TopnFilter filter = 
context.getTopnFilterContext().getTopnFilter(topN);
                 List<Pair<Integer, Integer>> targets = new ArrayList<>();
-                for (Map.Entry<ScanNode, Expr> entry : 
filter.legacyTargets.entrySet()) {
+                for (Entry<ScanNode, Expr> entry : 
filter.legacyTargets.entrySet()) {
                     Set<SlotRef> inputSlots = 
entry.getValue().getInputSlotRef();
                     if (inputSlots.size() != 1) {
                         LOG.warn("topn filter targets error: " + inputSlots);
@@ -3298,4 +3308,83 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         return child instanceof PhysicalRelation;
     }
+
+    private boolean setQueryCacheCandidate(
+            PhysicalHashAggregate<? extends Plan> aggregate, AggregationNode 
aggregationNode) {
+        if (hasUndeterministicExpression(aggregate)) {
+            return false;
+        }
+
+        PlanNode child = aggregationNode.getChild(0);
+        if (child instanceof AggregationNode) {
+            if (((AggregationNode) child).isQueryCacheCandidate()) {
+                aggregationNode.setQueryCacheCandidate(true);
+                return true;
+            }
+        } else if (child instanceof OlapScanNode) {
+            aggregationNode.setQueryCacheCandidate(true);
+            return true;
+        }
+        return false;
+    }
+
+    private boolean hasUndeterministicExpression(Plan plan) {
+        String cacheKey = "hasUndeterministicExpression";
+        Optional<Boolean> hasUndeterministicExpressionCache = 
plan.getMutableState(cacheKey);
+        if (hasUndeterministicExpressionCache.isPresent()) {
+            return hasUndeterministicExpressionCache.get();
+        }
+        boolean result;
+        if (plan instanceof PhysicalHashAggregate) {
+            PhysicalHashAggregate<? extends Plan> aggregate = 
(PhysicalHashAggregate<? extends Plan>) plan;
+            if (hasUndeterministicExpression(aggregate.getGroupByExpressions())
+                    || 
hasUndeterministicExpression(aggregate.getOutputExpressions())) {
+                result = true;
+            } else {
+                result = hasUndeterministicExpression(aggregate.child());
+            }
+        } else if (plan instanceof PhysicalFilter) {
+            PhysicalFilter<? extends Plan> filter = (PhysicalFilter<? extends 
Plan>) plan;
+            if (hasUndeterministicExpression(filter.getExpressions())) {
+                result = true;
+            } else {
+                result = hasUndeterministicExpression(filter.child());
+            }
+        } else if (plan instanceof PhysicalProject) {
+            PhysicalProject<? extends Plan> project = (PhysicalProject<? 
extends Plan>) plan;
+            if (hasUndeterministicExpression(project.getProjects())) {
+                result = true;
+            } else {
+                result = hasUndeterministicExpression(project.child());
+            }
+        } else if (plan instanceof PhysicalOlapScan) {
+            result = false;
+        } else {
+            // unsupported for query cache
+            result = true;
+        }
+        plan.setMutableState(cacheKey, result);
+        return result;
+    }
+
+    private boolean hasUndeterministicExpression(Collection<? extends 
Expression> expressions) {
+        for (Expression groupByExpression : expressions) {
+            if (groupByExpression.containsType(AliasFunction.class, Udf.class, 
UniqueFunction.class)) {
+                return true;
+            }
+
+            boolean nonDeterministic = groupByExpression.anyMatch(e -> {
+                if (e instanceof Expression) {
+                    if (!((Expression) e).isDeterministic()) {
+                        return true;
+                    }
+                }
+                return false;
+            });
+            if (nonDeterministic) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index de190387d59..1f475cc85fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -62,6 +62,8 @@ public class AggregationNode extends PlanNode {
 
     private SortInfo sortByGroupKey;
 
+    private boolean queryCacheCandidate;
+
     /**
      * Create an agg node that is not an intermediate node.
      * isIntermediate is true if it is a slave node in a 2-part agg plan.
@@ -272,4 +274,12 @@ public class AggregationNode extends PlanNode {
     public void setSortByGroupKey(SortInfo sortByGroupKey) {
         this.sortByGroupKey = sortByGroupKey;
     }
+
+    public boolean isQueryCacheCandidate() {
+        return queryCacheCandidate;
+    }
+
+    public void setQueryCacheCandidate(boolean queryCacheCandidate) {
+        this.queryCacheCandidate = queryCacheCandidate;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
index 861e7c5e4cd..b74ebd0e7c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
@@ -30,8 +30,10 @@ import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TNormalizedPlanNode;
 import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TStringLiteral;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -53,9 +55,9 @@ import java.util.stream.Collectors;
 public class QueryCacheNormalizer implements Normalizer {
     private final PlanFragment fragment;
     private final DescriptorTable descriptorTable;
-    private final NormalizedIdGenerator normalizedPlanIds =  new 
NormalizedIdGenerator();
-    private final NormalizedIdGenerator normalizedTupleIds =  new 
NormalizedIdGenerator();
-    private final NormalizedIdGenerator normalizedSlotIds =  new 
NormalizedIdGenerator();
+    private final NormalizedIdGenerator normalizedPlanIds = new 
NormalizedIdGenerator();
+    private final NormalizedIdGenerator normalizedTupleIds = new 
NormalizedIdGenerator();
+    private final NormalizedIdGenerator normalizedSlotIds = new 
NormalizedIdGenerator();
 
     // result
     private final TQueryCacheParam queryCacheParam = new TQueryCacheParam();
@@ -72,7 +74,7 @@ public class QueryCacheNormalizer implements Normalizer {
                 return Optional.empty();
             }
             List<TNormalizedPlanNode> normalizedDigestPlans = 
normalizePlanTree(context, cachePoint.get());
-            byte[] digest = computeDigest(normalizedDigestPlans);
+            byte[] digest = computeDigest(context, normalizedDigestPlans);
             return setQueryCacheParam(cachePoint.get(), digest, context);
         } catch (Throwable t) {
             return Optional.empty();
@@ -90,11 +92,12 @@ public class QueryCacheNormalizer implements Normalizer {
 
     private Optional<TQueryCacheParam> setQueryCacheParam(
             CachePoint cachePoint, byte[] digest, ConnectContext context) {
+        SessionVariable sessionVariable = context.getSessionVariable();
         queryCacheParam.setNodeId(cachePoint.cacheRoot.getId().asInt());
         queryCacheParam.setDigest(digest);
-        
queryCacheParam.setForceRefreshQueryCache(context.getSessionVariable().isQueryCacheForceRefresh());
-        
queryCacheParam.setEntryMaxBytes(context.getSessionVariable().getQueryCacheEntryMaxBytes());
-        
queryCacheParam.setEntryMaxRows(context.getSessionVariable().getQueryCacheEntryMaxRows());
+        
queryCacheParam.setForceRefreshQueryCache(sessionVariable.isQueryCacheForceRefresh());
+        
queryCacheParam.setEntryMaxBytes(sessionVariable.getQueryCacheEntryMaxBytes());
+        
queryCacheParam.setEntryMaxRows(sessionVariable.getQueryCacheEntryMaxRows());
 
         queryCacheParam.setOutputSlotMapping(
                 cachePoint.cacheRoot.getOutputTupleIds()
@@ -122,11 +125,16 @@ public class QueryCacheNormalizer implements Normalizer {
         if (planRoot instanceof AggregationNode) {
             PlanNode child = planRoot.getChild(0);
             if (child instanceof OlapScanNode) {
-                return Optional.of(new CachePoint(planRoot, planRoot));
+                if (((AggregationNode) planRoot).isQueryCacheCandidate()) {
+                    return Optional.of(new CachePoint(planRoot, planRoot));
+                }
             } else if (child instanceof AggregationNode) {
                 Optional<CachePoint> childCachePoint = 
doComputeCachePoint(child);
                 if (childCachePoint.isPresent()) {
-                    return Optional.of(new CachePoint(planRoot, planRoot));
+                    if (((AggregationNode) planRoot).isQueryCacheCandidate()) {
+                        return Optional.of(new CachePoint(planRoot, planRoot));
+                    }
+                    return childCachePoint;
                 }
             }
         } else if (planRoot instanceof ExchangeNode) {
@@ -151,13 +159,20 @@ public class QueryCacheNormalizer implements Normalizer {
         normalizedPlans.add(plan.normalize(this));
     }
 
-    public static byte[] computeDigest(List<TNormalizedPlanNode> 
normalizedDigestPlans) throws Exception {
+    public static byte[] computeDigest(
+            ConnectContext context, List<TNormalizedPlanNode> 
normalizedDigestPlans) throws Exception {
         TSerializer serializer = new TSerializer(new 
TCompactProtocol.Factory());
         MessageDigest digest = MessageDigest.getInstance("SHA-256");
 
         for (TNormalizedPlanNode node : normalizedDigestPlans) {
             digest.update(serializer.serialize(node));
         }
+
+        StringBuffer variables = new StringBuffer();
+        context.getSessionVariable().readAffectQueryResultVariables((k, v) -> {
+            variables.append(k).append("=").append(v).append("|");
+        });
+        digest.update(serializer.serialize(new 
TStringLiteral(variables.toString())));
         return digest.digest();
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
index de0237acd21..5648484f49d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
@@ -113,6 +113,7 @@ public class QueryCacheNormalizerTest extends 
TestWithFeService {
         createTables(nonPart, part1, part2, multiLeveParts);
 
         
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+        connectContext.getSessionVariable().setEnableQueryCache(true);
     }
 
     @Test
diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy 
b/regression-test/suites/query_p0/cache/query_cache.groovy
index 981b9f7d340..fb15e34a4f5 100644
--- a/regression-test/suites/query_p0/cache/query_cache.groovy
+++ b/regression-test/suites/query_p0/cache/query_cache.groovy
@@ -20,6 +20,8 @@ import java.util.stream.Collectors
 suite("query_cache") {
     def tableName = 
"table_3_undef_partitions2_keys3_properties4_distributed_by53"
 
+    sql "set enable_sql_cache=false"
+
     def test = {
         sql "set enable_query_cache=false"
 
diff --git 
a/regression-test/suites/query_p0/cache/query_cache_with_context.groovy 
b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
new file mode 100644
index 00000000000..e96cd0ca78a
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
@@ -0,0 +1,65 @@
+import java.util.concurrent.atomic.AtomicReference
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("query_cache_with_context") {
+    multi_sql """
+        set enable_sql_cache=false;
+        set enable_query_cache=false;
+        drop table if exists query_cache_with_context;
+        create table query_cache_with_context(id int, value decimal(26, 4)) 
properties('replication_num'='1');
+        insert into query_cache_with_context values(1, 1), (2, 2), (3, 3);
+        set enable_query_cache=true;
+        """
+
+    def getDigest = { def sqlStr ->
+        AtomicReference<String> result = new AtomicReference<>()
+        explain {
+            sql sqlStr
+
+            check {exp ->
+                def digests = exp.split("\n").findAll { line -> 
line.contains("DIGEST") }
+                if (!digests.isEmpty()) {
+                    result.set(digests.get(0).split(":")[1].trim())
+                }
+            }
+        }
+        return result.get()
+    }
+
+    def test_session_variable_change = {
+        sql "set enable_decimal256=true"
+        def digest1 = getDigest("select id from query_cache_with_context group 
by id")
+        sql "set enable_decimal256=false"
+        def digest2 = getDigest("select id from query_cache_with_context group 
by id")
+        assertNotEquals(digest1, digest2)
+    }()
+
+    def test_udf_function = {
+        def jarPath = 
"""${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+        scp_udf_file_to_all_be(jarPath)
+        sql("DROP FUNCTION IF EXISTS test_udf_with_query_cache(string, int, 
int);")
+        sql """ CREATE FUNCTION test_udf_with_query_cache(string, int, int) 
RETURNS string PROPERTIES (
+                                "file"="file://${jarPath}",
+                                "symbol"="org.apache.doris.udf.StringTest",
+                                "type"="JAVA_UDF"
+                            ); """
+        String digest = getDigest("select test_udf_with_query_cache(id, 2, 3) 
from query_cache_with_context group by 1")
+        assertEquals(null, digest)
+    }()
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to