morrySnow commented on code in PR #39471:
URL: https://github.com/apache/doris/pull/39471#discussion_r1743263010


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java:
##########
@@ -274,7 +304,8 @@ public void resolve(Expression expression) {
                 // We couldn't find the equivalent expression in output 
expressions and group-by expressions,
                 // so we should check whether the expression is valid.
                 if (expression instanceof SlotReference) {
-                    if (checkSlot) {
+                    if (checkSlot && (!outerScope.isPresent()
+                            || 
!outerScope.get().getCorrelatedSlots().contains(expression))) {

Review Comment:
   add some comments



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java:
##########
@@ -27,4 +30,10 @@ public interface AlwaysNotNullable extends ComputeNullable {
     default boolean nullable() {
         return false;
     }
+
+    // return value of this function if the input data is empty.
+    // for example, count(*) of empty table is 0;
+    default Expression resultForEmptyInput() {
+        throw new AnalysisException("should implement resultForEmptyInput() 
for " + this.getClass());
+    }

Review Comment:
   remove default impl



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java:
##########
@@ -59,14 +60,18 @@ public List<Rule> buildRules() {
         return ImmutableList.of(
             RuleType.FILL_UP_SORT_PROJECT.build(
                 logicalSort(logicalProject())
-                    .then(sort -> {
+                    .thenApply(ctx -> {
+                        LogicalSort<LogicalProject<Plan>> sort = ctx.root;
+                        Optional<Scope> outerScope = 
ctx.cascadesContext.getOuterScope();
                         LogicalProject<Plan> project = sort.child();
                         Set<Slot> projectOutputSet = project.getOutputSet();
                         Set<Slot> notExistedInProject = 
sort.getOrderKeys().stream()
                                 .map(OrderKey::getExpr)
                                 .map(Expression::getInputSlots)
                                 .flatMap(Set::stream)
-                                .filter(s -> !projectOutputSet.contains(s))
+                                .filter(s -> !projectOutputSet.contains(s)
+                                        && (!outerScope.isPresent() || 
!outerScope.get()
+                                                
.getCorrelatedSlots().contains(s)))

Review Comment:
   add some comments?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MapAgg.java:
##########
@@ -69,4 +71,9 @@ public MapAgg withDistinctAndChildren(boolean distinct, 
List<Expression> childre
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new MapLiteral(new ArrayList<>(), new ArrayList<>(), 
this.getDataType());

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectSet.java:
##########
@@ -101,4 +103,9 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C 
context) {
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayIntersect.java:
##########
@@ -73,4 +75,9 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C 
context) {
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java:
##########
@@ -192,7 +194,15 @@ public List<NamedExpression> getOutputs() {
 
     @Override
     public Plan pruneOutputs(List<NamedExpression> prunedOutputs) {
-        return withProjects(prunedOutputs);
+        List<NamedExpression> allProjects = new ArrayList<>(prunedOutputs);
+        for (NamedExpression expression : projects) {
+            if (expression.containsType(NoneMovableFunction.class)) {
+                if (!prunedOutputs.contains(expression)) {
+                    allProjects.add(expression);
+                }
+            }
+        }
+        return withProjects(allProjects);

Review Comment:
   could we first generate need added NoneMovableFunction list, use new list 
only if NoneMovableFunction list is not empty?



##########
regression-test/suites/nereids_p0/subquery/correlated_scalar_subquery.groovy:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("correlated_scalar_subquery") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql """
+        drop table if exists correlated_scalar_t1;
+    """
+    sql """
+        drop table if exists correlated_scalar_t2;
+    """
+
+    sql """
+        drop table if exists correlated_scalar_t3;
+    """
+
+    sql """
+        create table correlated_scalar_t1
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql """
+        create table correlated_scalar_t2
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+
+    sql """
+        create table correlated_scalar_t3
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+
+    sql """
+        insert into correlated_scalar_t1 values (1,null),(null,1),(1,2), 
(null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
+    """
+    sql """
+        insert into correlated_scalar_t2 values (1,null),(null,1),(1,4), 
(1,2), (null,3), (2,4), (3,7), (3,9),(null,null),(5,1);
+    """
+    sql """
+        insert into correlated_scalar_t3 values (1,null),(null,1),(1,9), 
(1,8), (null,7), (2,6), (3,7), (3,9),(null,null),(5,1);
+    """
+
+    qt_select_where1 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and correlated_scalar_t2.c2 < 
4);"""

Review Comment:
   add test case that sub query with
   - limit
   - limit offset
   - order by
   - topn
   - topn offset
   - corrlated filter + window + aggregate
   - corrlated filter + lateral view + aggregate



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java:
##########
@@ -245,15 +286,51 @@ public boolean hasGroupBy() {
 
         public boolean hasCorrelatedSlotsUnderAgg() {
             return correlatedSlots.isEmpty() ? false
-                    : findAggContainsCorrelatedSlots(logicalPlan, 
ImmutableSet.copyOf(correlatedSlots));
+                    : findCorrelatedSlotsUnderNode(logicalPlan,
+                            ImmutableSet.copyOf(correlatedSlots), 
LogicalAggregate.class);
+        }
+
+        public boolean hasCorrelatedSlotsUnderJoin() {
+            return correlatedSlots.isEmpty() ? false
+                    : findCorrelatedSlotsUnderNode(logicalPlan,
+                            ImmutableSet.copyOf(correlatedSlots), 
LogicalJoin.class);
+        }
+
+        public boolean hasCorrelatedSlotsInAgg() {
+            return correlatedSlots.isEmpty() ? false
+                    : findCorrelatedSlotsInNode(logicalPlan, 
ImmutableSet.copyOf(correlatedSlots),
+                            LogicalAggregate.class);
+        }
+
+        private static <T> boolean findCorrelatedSlotsInNode(Plan rootPlan,
+                ImmutableSet<Slot> slots, Class<T> clazz) {
+            ArrayDeque<Plan> planQueue = new ArrayDeque<>();
+            planQueue.add(rootPlan);
+            while (!planQueue.isEmpty()) {
+                Plan plan = planQueue.poll();
+                if (plan.getClass().equals(clazz)) {
+                    if (!Sets
+                            .intersection(slots,
+                                    
ExpressionUtils.getInputSlotSet(plan.getExpressions()))
+                            .isEmpty()) {
+                        return true;

Review Comment:
   reformat it for easy reading for human



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java:
##########
@@ -81,4 +105,27 @@ public Expression withTypeCoercion(DataType dataType) {
     public ScalarSubquery withSubquery(LogicalPlan subquery) {
         return new ScalarSubquery(subquery, correlateSlots, typeCoercionExpr);
     }
+
+    /**
+    * findTopLevelScalarAgg
+    */
+    public static Plan findTopLevelScalarAgg(Plan plan) {
+        if (plan instanceof LogicalAggregate) {
+            if (((LogicalAggregate<?>) 
plan).getGroupByExpressions().isEmpty()) {
+                return plan;
+            } else {
+                return null;
+            }
+        } else if (plan instanceof LogicalJoin) {
+            return null;
+        } else {
+            for (Plan child : plan.children()) {

Review Comment:
   still cannot understand why to find in children, think about
   ```sql
   SELECT * FROM t1 WHERE (SELECT * FROM (SELECT sum(sum()) OVER() AS ws FROM 
t2 GROUP BY t2.c1) t3 WHERE t3.ws = t1.c2)
   ```
   we will return the sum() agg under window node.
   i think this is not correct



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ArrayAgg.java:
##########
@@ -64,4 +66,9 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C 
context) {
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());

Review Comment:
   ```suggestion
           return new ArrayLiteral(ImmutableList.of(), this.getDataType());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctSum0.java:
##########
@@ -89,4 +96,20 @@ public boolean mustUseMultiDistinctAgg() {
     public Expression withMustUseMultiDistinctAgg(boolean 
mustUseMultiDistinctAgg) {
         return new MultiDistinctSum0(mustUseMultiDistinctAgg, false, 
children.get(0));
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        DataType dataType = getDataType();
+        if (dataType.isBigIntType()) {
+            return new BigIntLiteral(0);
+        } else if (dataType.isLargeIntType()) {
+            return new LargeIntLiteral(new BigInteger("0"));
+        } else if (dataType.isDecimalV3Type()) {
+            return new DecimalV3Literal((DecimalV3Type) dataType, new 
BigDecimal("0"));
+        } else if (dataType.isDoubleType()) {
+            return new DoubleLiteral(0);
+        } else {
+            return new DoubleLiteral(0);
+        }

Review Comment:
   should throw exception?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Sum0.java:
##########
@@ -127,4 +133,20 @@ public Function constructRollUp(Expression param, 
Expression... varParams) {
     public boolean canRollUp() {
         return true;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        DataType dataType = getDataType();
+        if (dataType.isBigIntType()) {
+            return new BigIntLiteral(0);
+        } else if (dataType.isLargeIntType()) {
+            return new LargeIntLiteral(new BigInteger("0"));
+        } else if (dataType.isDecimalV3Type()) {
+            return new DecimalV3Literal((DecimalV3Type) dataType, new 
BigDecimal("0"));
+        } else if (dataType.isDoubleType()) {
+            return new DoubleLiteral(0);
+        } else {
+            return new DoubleLiteral(0);
+        }

Review Comment:
   throw exception



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java:
##########
@@ -33,20 +37,40 @@
  */
 public class ScalarSubquery extends SubqueryExpr {
 
+    private final boolean hasTopLevelScalarAgg;
+
     public ScalarSubquery(LogicalPlan subquery) {
-        super(Objects.requireNonNull(subquery, "subquery can not be null"));
+        this(subquery, ImmutableList.of());
     }
 
     public ScalarSubquery(LogicalPlan subquery, List<Slot> correlateSlots) {
-        this(Objects.requireNonNull(subquery, "subquery can not be null"),
-                Objects.requireNonNull(correlateSlots, "correlateSlots can not 
be null"),
-                Optional.empty());
+        this(subquery, correlateSlots, Optional.empty());
     }
 
     public ScalarSubquery(LogicalPlan subquery, List<Slot> correlateSlots, 
Optional<Expression> typeCoercionExpr) {
         super(Objects.requireNonNull(subquery, "subquery can not be null"),
                 Objects.requireNonNull(correlateSlots, "correlateSlots can not 
be null"),
                 typeCoercionExpr);
+        hasTopLevelScalarAgg = findTopLevelScalarAgg(subquery) != null;
+    }
+
+    public boolean hasTopLevelScalarAgg() {
+        return hasTopLevelScalarAgg;
+    }
+
+    /**
+    * getTopLevelScalarAggFunction
+    */
+    public Optional<NamedExpression> getTopLevelScalarAggFunction() {
+        Plan plan = findTopLevelScalarAgg(queryPlan);
+        if (plan != null) {
+            LogicalAggregate aggregate = (LogicalAggregate) plan;
+            Preconditions.checkState(aggregate.getAggregateFunctions().size() 
== 1,
+                    "agg is not a scalar agg, it's output is ", 
aggregate.getOutputExpressions());

Review Comment:
   need a better error message to indicate the scope is subquery. scalar agg 
means only return one row. so this should not say not a scalar agg



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectList.java:
##########
@@ -95,4 +97,9 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C 
context) {
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());

Review Comment:
   ```suggestion
           return new ArrayLiteral(ImmutableList.of(), this.getDataType());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java:
##########
@@ -114,26 +120,56 @@ public Expression visitInSubquery(InSubquery expr, T 
context) {
     @Override
     public Expression visitScalarSubquery(ScalarSubquery scalar, T context) {
         AnalyzedResult analyzedResult = analyzeSubquery(scalar);
-
+        boolean isCorrelated = analyzedResult.isCorrelated();
+        LogicalPlan analyzedSubqueryPlan = analyzedResult.logicalPlan;
+        if (isCorrelated) {
+            if (analyzedSubqueryPlan instanceof LogicalLimit) {
+                if 
(ScalarSubquery.findTopLevelScalarAgg(analyzedResult.logicalPlan) == null) {
+                    throw new AnalysisException("limit is not supported in 
correlated subquery "
+                            + analyzedResult.getLogicalPlan());
+                } else {
+                    analyzedSubqueryPlan = (LogicalPlan) 
analyzedSubqueryPlan.child(0);

Review Comment:
   why just remove limit? what about limit offset? or limit 0?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/PercentileArray.java:
##########
@@ -86,4 +88,9 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C 
context) {
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());

Review Comment:
   ImmutableList.of()



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnFE.java:
##########
@@ -619,6 +621,21 @@ public Expression visitVersion(Version version, 
ExpressionRewriteContext context
         return new StringLiteral(GlobalVariable.version);
     }
 
+    @Override
+    public Expression visitNvl(Nvl nvl, ExpressionRewriteContext context) {

Review Comment:
    what will happen if we do not fold nvl?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java:
##########
@@ -114,26 +120,56 @@ public Expression visitInSubquery(InSubquery expr, T 
context) {
     @Override
     public Expression visitScalarSubquery(ScalarSubquery scalar, T context) {
         AnalyzedResult analyzedResult = analyzeSubquery(scalar);
-
+        boolean isCorrelated = analyzedResult.isCorrelated();
+        LogicalPlan analyzedSubqueryPlan = analyzedResult.logicalPlan;
+        if (isCorrelated) {
+            if (analyzedSubqueryPlan instanceof LogicalLimit) {
+                if 
(ScalarSubquery.findTopLevelScalarAgg(analyzedResult.logicalPlan) == null) {
+                    throw new AnalysisException("limit is not supported in 
correlated subquery "
+                            + analyzedResult.getLogicalPlan());
+                } else {
+                    analyzedSubqueryPlan = (LogicalPlan) 
analyzedSubqueryPlan.child(0);
+                }
+            }
+            if (analyzedSubqueryPlan instanceof LogicalSort) {
+                // skip useless sort node
+                analyzedResult = new AnalyzedResult((LogicalPlan) 
analyzedSubqueryPlan.child(0),
+                        analyzedResult.correlatedSlots);
+            }
+        }
         checkOutputColumn(analyzedResult.getLogicalPlan());
-        checkHasAgg(analyzedResult);
         checkHasNoGroupBy(analyzedResult);
+        checkNoCorrelatedSlotsInAgg(analyzedResult);
+        checkNoCorrelatedSlotsUnderJoin(analyzedResult);
 
-        // if scalar subquery is like select '2024-02-02 00:00:00'
-        // we can just return the constant expr '2024-02-02 00:00:00'
+        LogicalPlan subqueryPlan = analyzedResult.getLogicalPlan();
         if (analyzedResult.getLogicalPlan() instanceof LogicalProject) {
             LogicalProject project = (LogicalProject) 
analyzedResult.getLogicalPlan();
             if (project.child() instanceof LogicalOneRowRelation
                     && project.getProjects().size() == 1
                     && project.getProjects().get(0) instanceof Alias) {
+                // if scalar subquery is like select '2024-02-02 00:00:00'
+                // we can just return the constant expr '2024-02-02 00:00:00'
                 Alias alias = (Alias) project.getProjects().get(0);
                 if (alias.isConstant()) {
                     return alias.child();
                 }
+            } else if (isCorrelated) {
+                if 
(ExpressionUtils.containsWindowExpression(project.getProjects())) {

Review Comment:
   add comment, it's not that easy to think about



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java:
##########
@@ -172,6 +197,22 @@ private void checkNoCorrelatedSlotsUnderAgg(AnalyzedResult 
analyzedResult) {
         }
     }
 
+    private void checkNoCorrelatedSlotsUnderJoin(AnalyzedResult 
analyzedResult) {
+        if (analyzedResult.hasCorrelatedSlotsUnderJoin()) {
+            throw new AnalysisException(
+                    String.format("Unsupported accesss outer join's column 
under join operator : %s",
+                    analyzedResult.getCorrelatedSlots()));
+        }
+    }
+
+    private void checkNoCorrelatedSlotsInAgg(AnalyzedResult analyzedResult) {
+        if (analyzedResult.hasCorrelatedSlotsInAgg()) {
+            throw new AnalysisException(String.format(
+                    "outer query's column is not supported in subquery's 
aggregation operator : %s",
+                    analyzedResult.getCorrelatedSlots()));
+        }
+    }

Review Comment:
   add comment to explain the sql pattern or plan pattern want to check



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java:
##########
@@ -245,15 +286,51 @@ public boolean hasGroupBy() {
 
         public boolean hasCorrelatedSlotsUnderAgg() {
             return correlatedSlots.isEmpty() ? false
-                    : findAggContainsCorrelatedSlots(logicalPlan, 
ImmutableSet.copyOf(correlatedSlots));
+                    : findCorrelatedSlotsUnderNode(logicalPlan,
+                            ImmutableSet.copyOf(correlatedSlots), 
LogicalAggregate.class);
+        }
+
+        public boolean hasCorrelatedSlotsUnderJoin() {
+            return correlatedSlots.isEmpty() ? false
+                    : findCorrelatedSlotsUnderNode(logicalPlan,
+                            ImmutableSet.copyOf(correlatedSlots), 
LogicalJoin.class);
+        }
+
+        public boolean hasCorrelatedSlotsInAgg() {
+            return correlatedSlots.isEmpty() ? false
+                    : findCorrelatedSlotsInNode(logicalPlan, 
ImmutableSet.copyOf(correlatedSlots),
+                            LogicalAggregate.class);
+        }
+
+        private static <T> boolean findCorrelatedSlotsInNode(Plan rootPlan,

Review Comment:
   ```suggestion
           private static <T> boolean hasCorrelatedSlotsInNode(Plan rootPlan,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to