This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e6825b7ed4b1cf8fcdd3948684dd2a5cfe8255b0 Author: Steve Carlin <[email protected]> AuthorDate: Thu Jul 4 15:00:47 2024 -0700 IMPALA-13197: Implement Analytic Exprs for Calcite An analytic expression is represented with a RexOver type RexNode within Calcite. They will exist within the Project RelNode. If there are any RexOvers existing within the Project, then the ImpalaAnalyticRel RelNode gets created instead of the ImpalaProjectRel. Only bare bones test cases are included. There are quite a number of analytic expressions that will not work. The logic is included in the AnalyticExpr.standardize() method. Another commit will be needed to support all general analytic expressions and the tests within Impala will be used for testing purposes. Change-Id: Iba5060546a7568ba0cd315f546daa78d89b1c3c5 Reviewed-on: http://gerrit.cloudera.org:8080/21565 Reviewed-by: Joe McDonnell <[email protected]> Reviewed-by: Michael Smith <[email protected]> Tested-by: Michael Smith <[email protected]> --- .../calcite/coercenodes/CoerceOperandShuttle.java | 53 ++- .../calcite/functions/AnalyzedAnalyticExpr.java | 55 +++ .../impala/calcite/functions/FunctionResolver.java | 3 +- .../calcite/rel/node/ConvertToImpalaRelRules.java | 15 +- .../impala/calcite/rel/node/ImpalaAnalyticRel.java | 478 +++++++++++++++++++++ .../impala/calcite/rel/util/CreateExprVisitor.java | 3 + .../queries/QueryTest/calcite.test | 62 +++ 7 files changed, 664 insertions(+), 5 deletions(-) diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/coercenodes/CoerceOperandShuttle.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/coercenodes/CoerceOperandShuttle.java index fb73c4bb6..2de3550de 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/coercenodes/CoerceOperandShuttle.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/coercenodes/CoerceOperandShuttle.java @@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SqlKind; @@ -112,7 +113,7 @@ public class CoerceOperandShuttle extends RexShuttle { call); } - RelDataType retType = ImpalaTypeConverter.getRelDataType(fn.getReturnType()); + RelDataType retType = getReturnType(castedOperandsCall, fn.getReturnType()); // This code does not handle changes in the return type when the Calcite // function is not a decimal but the function resolves to a function that @@ -139,6 +140,34 @@ public class CoerceOperandShuttle extends RexShuttle { newOperands); } + @Override + public RexNode visitOver(RexOver over) { + // recursively call all embedded RexCalls first + RexOver castedOver = (RexOver) super.visitOver(over); + + Function fn = FunctionResolver.getSupertypeFunction(castedOver); + + if (fn == null) { + throw new RuntimeException("Could not find a matching signature for call " + + over); + } + + RelDataType retType = getReturnType(castedOver, fn.getReturnType()); + + List<RexNode> newOperands = + getCastedArgTypes(fn, castedOver.getOperands(), factory, rexBuilder); + + return retType.equals(castedOver.getType()) && + newOperands.equals(castedOver.getOperands()) + ? castedOver + : (RexOver) rexBuilder.makeOver(retType, castedOver.getAggOperator(), + newOperands, castedOver.getWindow().partitionKeys, + castedOver.getWindow().orderKeys, castedOver.getWindow().getLowerBound(), + castedOver.getWindow().getUpperBound(), castedOver.getWindow().isRows(), + true /*allowPartial*/, false /*nullWhenCountZero*/, castedOver.isDistinct(), + castedOver.ignoreNulls()); + } + @Override public RexNode visitLiteral(RexLiteral literal) { // Coerce CHAR literal types into STRING @@ -166,6 +195,28 @@ public class CoerceOperandShuttle extends RexShuttle { : rexBuilder.makeInputRef(inputRefIndexType, inputRef.getIndex()); } + + private RelDataType getReturnType(RexNode rexNode, Type impalaReturnType) { + + RelDataType retType = ImpalaTypeConverter.getRelDataType(impalaReturnType); + + // This code does not handle changes in the return type when the Calcite + // function is not a decimal but the function resolves to a function that + // returns a decimal type. The Decimal type from the function resolver would + // have to calculate the precision and scale based on operand types. If + // necessary, this code should be added later. + Preconditions.checkState(retType.getSqlTypeName() != SqlTypeName.DECIMAL || + rexNode.getType().getSqlTypeName() == SqlTypeName.DECIMAL); + + // So if the original return type is Decimal and the function resolves to + // decimal, the precision and scale are saved from the original function. + if (retType.getSqlTypeName().equals(SqlTypeName.DECIMAL)) { + retType = rexNode.getType(); + } + + return retType; + } + /** * Handle getting the type of index. If there is only one input, then we * just use the index value to get the type. If there are two inputs, diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java new file mode 100644 index 000000000..d4b45d6b4 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.functions; + +import java.util.List; +import org.apache.impala.analysis.AnalyticExpr; +import org.apache.impala.analysis.AnalyticWindow; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.OrderByElement; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; + +/** + * Analytical Expression that is always in analyzed state + */ +public class AnalyzedAnalyticExpr extends AnalyticExpr { + public AnalyzedAnalyticExpr(FunctionCallExpr fnCall, + List<Expr> partitionExprs, List<OrderByElement> orderByElements, + AnalyticWindow window) throws ImpalaException { + super(fnCall, partitionExprs, orderByElements, window); + this.type_ = fnCall.getType(); + } + + public AnalyzedAnalyticExpr(AnalyzedAnalyticExpr other) { + super(other); + this.type_ = other.type_; + } + + @Override + public Expr clone() { + return new AnalyzedAnalyticExpr(this); + } + + @Override + protected void analyzeImpl(Analyzer analyzer) throws AnalysisException { + // TODO: Will need to call standardize for some of the analytic functions. + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java index d2bb9d66e..6f776b903 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java @@ -41,8 +41,7 @@ import org.slf4j.LoggerFactory; /** * The FunctionResolver is a wrapper around the Impala Function Resolver (via the - * (Db.getFunction() method). In this current iteration, only exact matches are - * resolved. TODO: IMPALA-13022: change this comment when implicit conversion is handled. + * (Db.getFunction() method). */ public class FunctionResolver { protected static final Logger LOG = diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java index 349489e90..1a62a0580 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java @@ -19,6 +19,7 @@ package org.apache.impala.calcite.rel.node; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; @@ -27,6 +28,13 @@ import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.logical.LogicalUnion; import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; + +import java.util.ArrayList; +import java.util.List; /** * ConvertToImpalaRelRules. Contains the rules used to change the Calcite RelNodes @@ -44,7 +52,11 @@ public class ConvertToImpalaRelRules { @Override public void onMatch(RelOptRuleCall call) { final LogicalProject project = call.rel(0); - call.transformTo(new ImpalaProjectRel(project)); + List<RexOver> rexOvers = ImpalaAnalyticRel.gatherRexOver(project.getProjects()); + Project newProject = rexOvers.size() > 0 + ? new ImpalaAnalyticRel(project) + : new ImpalaProjectRel(project); + call.transformTo(newProject); } } @@ -136,5 +148,4 @@ public class ConvertToImpalaRelRules { call.transformTo(new ImpalaJoinRel(join)); } } - } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java new file mode 100644 index 000000000..c9bcbc3c0 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexFieldCollation; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.rex.RexWindow; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.commons.collections.CollectionUtils; +import org.apache.impala.analysis.AnalyticExpr; +import org.apache.impala.analysis.AnalyticInfo; +import org.apache.impala.analysis.AnalyticWindow; +import org.apache.impala.analysis.AnalyticWindow.Boundary; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.ExprSubstitutionMap; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.FunctionParams; +import org.apache.impala.analysis.OrderByElement; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.calcite.functions.AnalyzedAnalyticExpr; +import org.apache.impala.calcite.functions.AnalyzedFunctionCallExpr; +import org.apache.impala.calcite.functions.FunctionResolver; +import org.apache.impala.calcite.functions.RexCallConverter; +import org.apache.impala.calcite.functions.RexLiteralConverter; +import org.apache.impala.calcite.rel.util.CreateExprVisitor; +import org.apache.impala.calcite.rel.util.ExprConjunctsConverter; +import org.apache.impala.calcite.type.ImpalaTypeConverter; +import org.apache.impala.catalog.Function; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.AnalyticPlanner; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.SelectNode; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * + */ +public class ImpalaAnalyticRel extends Project + implements ImpalaPlanRel { + + protected static final Logger LOG = LoggerFactory.getLogger(ImpalaAnalyticRel.class); + + public ImpalaAnalyticRel(Project project) { + super(project.getCluster(), project.getTraitSet(), project.getHints(), + project.getInput(0), project.getProjects(), project.getRowType()); + } + + private ImpalaAnalyticRel(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, new ArrayList<>(), input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new ImpalaAnalyticRel(getCluster(), traitSet, input, projects, rowType); + } + + /** + * Convert the Project RelNode with analytic exprs into a Impala Plan Nodes. + */ + @Override + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException { + + List<RexNode> projects = getProjects(); + NodeWithExprs inputNodeWithExprs = getChildPlanNode(context, projects); + ImpalaPlanRel inputRel = (ImpalaPlanRel) getInput(0); + + + // retrieve list of all analytic expressions + List<RexOver> overExprs = gatherRexOver(projects); + + // get the GroupedAnalyticExpr objects. A GroupedAnalyticExpr object will + // contain a unique analytic expr and all the RexOver objects which are + // equivalent to the unique analytic expr. + List<GroupedAnalyticExpr> groupAnalyticExprs = getGroupedAnalyticExprs( + overExprs, context.ctx_, inputRel, inputNodeWithExprs.outputExprs_); + + List<AnalyticExpr> analyticExprs = new ArrayList<>(); + for (GroupedAnalyticExpr g : groupAnalyticExprs) { + analyticExprs.add(g.analyticExpr); + } + + AnalyticInfo analyticInfo = + AnalyticInfo.create(analyticExprs, context.ctx_.getRootAnalyzer()); + AnalyticPlanner analyticPlanner = + new AnalyticPlanner(analyticInfo, context.ctx_.getRootAnalyzer(), context.ctx_); + + PlanNode planNode = analyticPlanner.createSingleNodePlan( + inputNodeWithExprs.planNode_, Collections.emptyList(), new ArrayList<>()); + + // Get a mapping of all expressions to its corresponding Impala Expr object. The + // non-analytic expressions will have a RexInputRef type RexNode, while the + // analytic expressions will have the RexOver type RexNode. + Map<RexNode, Expr> mapping = createRexNodeExprMapping(inputRel, planNode, projects, + inputNodeWithExprs.outputExprs_, groupAnalyticExprs, context.ctx_, analyticInfo); + + List<Expr> outputExprs = + getOutputExprs(mapping, projects, context.ctx_.getRootAnalyzer()); + + NodeWithExprs retNode = new NodeWithExprs(planNode, outputExprs, inputNodeWithExprs); + + // If there is a filter condition, a SelectNode will get added on top + // of the retNode. + return NodeCreationUtils.wrapInSelectNodeIfNeeded(context, retNode, + getCluster().getRexBuilder()); + } + + private NodeWithExprs getChildPlanNode(ParentPlanRelContext context, + List<RexNode> projects) throws ImpalaException { + ImpalaPlanRel relInput = (ImpalaPlanRel) getInput(0); + ParentPlanRelContext.Builder builder = + new ParentPlanRelContext.Builder(context, this); + builder.setFilterCondition(null); + builder.setInputRefs(RelOptUtil.InputFinder.bits(projects, null)); + return relInput.getPlanNode(builder.build()); + } + + /** + * Generates the AnalyticExpr object from the RexOver and the input plan node + * expressions. + */ + private AnalyticExpr getAnalyticExpr(RexOver rexOver, PlannerContext ctx, + ImpalaPlanRel inputRel, List<Expr> inputExprs) throws ImpalaException { + final RexWindow rexWindow = rexOver.getWindow(); + // First parameter is the function call + Function fn = getFunction(rexOver); + Type impalaRetType = ImpalaTypeConverter.createImpalaType(rexOver.getType()); + CreateExprVisitor visitor = new CreateExprVisitor(getCluster().getRexBuilder(), + inputExprs, ctx.getRootAnalyzer()); + + List<Expr> operands = CreateExprVisitor.getExprs(visitor, rexOver.operands); + + FunctionParams params = new FunctionParams(rexOver.isDistinct(), + rexOver.ignoreNulls(), operands); + FunctionCallExpr fnCall = new AnalyzedFunctionCallExpr(fn, params, impalaRetType); + fnCall.analyze(ctx.getRootAnalyzer()); + fnCall.setIsAnalyticFnCall(true); + + // Second parameter contains the partition expressions + List<Expr> partitionExprs = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(rexWindow.partitionKeys)) { + partitionExprs = CreateExprVisitor.getExprs(visitor, rexWindow.partitionKeys); + } + + // Third parameter contains the sort expressions + List<OrderByElement> orderByElements = new ArrayList<>(); + if (rexWindow.orderKeys != null) { + for (RexFieldCollation ok : rexWindow.orderKeys) { + Expr orderByExpr = CreateExprVisitor.getExpr(visitor, ok.left); + boolean nullsFirst = ok.getDirection() == RelFieldCollation.Direction.ASCENDING + ? ok.right.contains(SqlKind.NULLS_FIRST) + : !ok.right.contains(SqlKind.NULLS_FIRST); + OrderByElement orderByElement = new OrderByElement(orderByExpr, + ok.getDirection() == RelFieldCollation.Direction.ASCENDING, + nullsFirst); + orderByElements.add(orderByElement); + } + } + + // Fourth parameter is the window frame spec. + // For offset functions like LEAD/LAG, we skip this and let Impala assign + // the window frame as part of the AnalyticExpr's standardization. + AnalyticWindow window = null; + if ((!partitionExprs.isEmpty() || !orderByElements.isEmpty())) { + Boundary lBoundary = getWindowBoundary(rexWindow.getLowerBound(), + ctx, inputRel, visitor); + Boundary rBoundary = getWindowBoundary(rexWindow.getUpperBound(), + ctx, inputRel, visitor); + window = new AnalyticWindow( + rexWindow.isRows() ? AnalyticWindow.Type.ROWS : AnalyticWindow.Type.RANGE, + lBoundary, rBoundary); + } + + AnalyzedAnalyticExpr retExpr = new AnalyzedAnalyticExpr(fnCall, partitionExprs, + orderByElements, window); + retExpr.analyze(ctx.getRootAnalyzer()); + return retExpr; + } + + private Boundary getWindowBoundary(RexWindowBound wb, PlannerContext ctx, + ImpalaPlanRel inputRel, CreateExprVisitor visitor) throws ImpalaException { + // At this stage, Calcite should have filled in the bound + Preconditions.checkNotNull(wb); + if (wb.isCurrentRow()) { + return new Boundary(AnalyticWindow.BoundaryType.CURRENT_ROW, null); + } else { + if (wb.isPreceding()) { + if (wb.isUnbounded()) { + return new Boundary(AnalyticWindow.BoundaryType.UNBOUNDED_PRECEDING, null); + } + Expr operand = CreateExprVisitor.getExpr(visitor, wb.getOffset()); + return new Boundary(AnalyticWindow.BoundaryType.PRECEDING, operand, + new BigDecimal(RexLiteral.intValue(wb.getOffset()))); + } else { + if (wb.isUnbounded()) { + return new Boundary(AnalyticWindow.BoundaryType.UNBOUNDED_FOLLOWING, null); + } + Expr operand = CreateExprVisitor.getExpr(visitor, wb.getOffset()); + return new Boundary(AnalyticWindow.BoundaryType.FOLLOWING, operand, + new BigDecimal(RexLiteral.intValue(wb.getOffset()))); + } + } + } + + private List<Expr> getOutputExprs(Map<RexNode, Expr> mapping, + List<RexNode> projects, Analyzer analyzer) throws ImpalaException { + + AnalyticRexVisitor visitor = + new AnalyticRexVisitor(mapping, getCluster().getRexBuilder()); + + Map<Integer, Expr> projectExprs = new LinkedHashMap<>(); + List<Expr> outputExprs = new ArrayList<>(); + // Walk through all the projects and grab the already created Expr object that exists + // in the "mapping" variable. + for (RexNode rexNode : projects) { + Expr expr = rexNode.accept(visitor); + expr.analyze(analyzer); + outputExprs.add(expr); + } + return outputExprs; + } + + public static List<RexOver> gatherRexOver(List<RexNode> exprs) { + final List<RexOver> result = new ArrayList<>(); + RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) { + public Void visitOver(RexOver over) { + result.add(over); + return super.visitOver(over); + } + }; + for (RexNode expr : exprs) { + expr.accept(visitor); + } + return result; + } + + private Function getFunction(RexOver exp) + throws ImpalaException { + RelDataType retType = exp.getType(); + SqlAggFunction aggFunction = exp.getAggOperator(); + List<RelDataType> operandTypes = Lists.newArrayList(); + for (RexNode operand : exp.operands) { + operandTypes.add(operand.getType()); + } + return FunctionResolver.getExactFunction(aggFunction.getName(), + aggFunction.getKind(), operandTypes); + } + + /** + * Get the analytic Expr objects from the RexOver objects in the Project. + * Impala does not allow duplicate analytic expressions. So if two different + * RexOvers create the same AnalyticExpr object, they get grouped together in + * one GroupedAnalyticExpr object. + */ + private List<GroupedAnalyticExpr> getGroupedAnalyticExprs(List<RexOver> overExprs, + PlannerContext ctx, ImpalaPlanRel inputRel, + List<Expr> inputExprs) throws ImpalaException { + List<AnalyticExpr> analyticExprs = new ArrayList<>(); + List<List<RexOver>> overExprsList = new ArrayList<>(); + for (RexOver over : overExprs) { + AnalyticExpr analyticExpr = + getAnalyticExpr(over, ctx, inputRel, inputExprs); + // check if we've seen this analytic expression before. + int index = analyticExprs.indexOf(analyticExpr); + if (index == -1) { + analyticExprs.add(analyticExpr); + overExprsList.add(Lists.newArrayList(over)); + } else { + overExprsList.get(index).add(over); + } + } + // The total number of unique analytic expressions should match the number + // of RexOver expression lists created. + Preconditions.checkState(analyticExprs.size() == overExprsList.size()); + + // Create the GroupedAnalyticExprs from the corresponding lists + List<GroupedAnalyticExpr> groupedAnalyticExprs = new ArrayList<>(); + for (int i = 0; i < analyticExprs.size(); ++i) { + groupedAnalyticExprs.add( + new GroupedAnalyticExpr(analyticExprs.get(i), overExprsList.get(i))); + } + return groupedAnalyticExprs; + } + + private Map<RexNode, Expr> createRexNodeExprMapping(ImpalaPlanRel inputRel, + PlanNode planNode, List<RexNode> projects, List<Expr> inputExprs, + List<GroupedAnalyticExpr> groupAnalyticExprs, + PlannerContext ctx, AnalyticInfo analyticInfo) { + // Gather mappings from nodes created by analytic planner + ExprSubstitutionMap outputExprMap = planNode.getOutputSmap(); + // We populate the outputs from the expressions + Map<RexNode, Expr> mapping = new LinkedHashMap<>(); + + // All the input references are going to get marked as a RexInputRef and + // will be mapped to its given expression's position number + for (int pos : getInputReferences(projects)) { + // Get the Impala expr after substituting its operands based on the expression map + Expr e = inputExprs.get(pos).substitute(outputExprMap, ctx.getRootAnalyzer(), + /* preserveRootType = */true); + mapping.put(RexInputRef.of(pos, getInput(0).getRowType().getFieldList()), e); + } + + // Create a new SlotRef for analytic expressions. + for (int i = 0; i < groupAnalyticExprs.size(); i++) { + GroupedAnalyticExpr g = groupAnalyticExprs.get(i); + SlotDescriptor slotDesc = + analyticInfo.getOutputTupleDesc().getSlots().get(i); + SlotRef logicalOutputSlot = new SlotRef(slotDesc); + for (RexOver over : g.overExprsList) { + mapping.put(over, outputExprMap.get(logicalOutputSlot)); + } + } + + return mapping; + } + + private Set<Integer> getInputReferences(List<RexNode> projects) { + RelOptUtil.InputReferencedVisitor shuttle = new RelOptUtil.InputReferencedVisitor(); + shuttle.apply(projects); + return shuttle.inputPosReferenced; + } + + private static class GroupedAnalyticExpr { + public final AnalyticExpr analyticExpr; + public final List<RexOver> overExprsList; + + public GroupedAnalyticExpr(AnalyticExpr analyticExpr, List<RexOver> overExprsList) { + this.analyticExpr = analyticExpr; + this.overExprsList = overExprsList; + } + } + + /** + * Visitor class that walks through the RexNode from the Project and + * creates an expression. The Expr objects for the input PlanNode are + * created through the AnalyticPlanner, so we use the mapping to help + * extract these Expr objects. + */ + private static class AnalyticRexVisitor extends RexVisitorImpl<Expr> { + + private final Map<RexNode, Expr> exprsMap_; + + private final RexBuilder rexBuilder_; + + public AnalyticRexVisitor(Map<RexNode, Expr> exprsMap, + RexBuilder rexBuilder) { + super(false); + this.exprsMap_ = exprsMap; + this.rexBuilder_ = rexBuilder; + } + + @Override + public Expr visitCall(RexCall rexCall) { + List<Expr> params = Lists.newArrayList(); + for (RexNode operand : rexCall.getOperands()) { + params.add(operand.accept(this)); + } + return RexCallConverter.getExpr(rexCall, params, rexBuilder_); + } + + @Override + public Expr visitLiteral(RexLiteral rexLiteral) { + return RexLiteralConverter.getExpr(rexLiteral); + } + + @Override + public Expr visitInputRef(RexInputRef rexInputRef) { + return exprsMap_.get(rexInputRef); + } + + @Override + public Expr visitOver(RexOver over) { + return exprsMap_.get(over); + } + + @Override + public Expr visitLocalRef(RexLocalRef localRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitCorrelVariable(RexCorrelVariable correlVariable) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitDynamicParam(RexDynamicParam dynamicParam) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitRangeRef(RexRangeRef rangeRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitFieldAccess(RexFieldAccess fieldAccess) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitSubQuery(RexSubQuery subQuery) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitTableInputRef(RexTableInputRef fieldRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitPatternFieldRef(RexPatternFieldRef fieldRef) { + throw new RuntimeException("Not supported"); + } + } + + @Override + public RelNodeType relNodeType() { + return RelNodeType.PROJECT; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java index 8d0ee7e9e..64600f2b8 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java @@ -145,6 +145,9 @@ public class CreateExprVisitor extends RexVisitorImpl<Expr> { public static List<Expr> getExprs(CreateExprVisitor visitor, List<RexNode> operands) throws ImpalaException { List<Expr> exprs = new ArrayList<>(); + if (operands == null) { + return exprs; + } for (RexNode operand : operands) { exprs.add(getExpr(visitor, operand)); } diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test b/testdata/workloads/functional-query/queries/QueryTest/calcite.test index 5c29f0639..d33531f65 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/calcite.test +++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test @@ -768,3 +768,65 @@ select sum(tinyint_col) from functional.alltypestiny; ---- TYPES bigint ==== +---- QUERY +# duplicate test from analytics-fn.test, delete when it is activated. +select date_part, +count(date_col) over (partition by date_part), +min(date_col) over (partition by date_part), +max(date_col) over (partition by date_part) +from functional.date_tbl +order by date_part; +---- RESULTS: VERIFY_IS_EQUAL_SORTED +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +0001-01-01,6,0001-01-01,9999-12-31 +1399-06-27,2,2017-11-28,2018-12-31 +1399-06-27,2,2017-11-28,2018-12-31 +1399-06-27,2,2017-11-28,2018-12-31 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +2017-11-27,10,0001-06-21,2017-11-28 +9999-12-31,2,9999-12-01,9999-12-31 +9999-12-31,2,9999-12-01,9999-12-31 +---- TYPES +DATE, BIGINT, DATE, DATE +==== +---- QUERY +# Test ROWS windows with start boundaries +select id, +count(id) over (order by id rows between 3 preceding and 3 preceding) c1, +count(id) over (order by id rows between 3 preceding and 2 preceding) c2, +count(id) over (order by id rows between 3 preceding and 1 preceding) c3, +count(id) over (order by id rows between 3 preceding and current row) c4, +count(id) over (order by id rows between 3 preceding and 1 following) c5, +count(id) over (order by id rows between 3 preceding and 2 following) c6, +count(id) over (order by id rows between 3 preceding and 3 following) c7, +count(id) over (order by id rows between 2 preceding and 3 following) c8, +count(id) over (order by id rows between 1 preceding and 3 following) c9, +count(id) over (order by id rows between current row and 3 following) c10, +count(id) over (order by id rows between 1 following and 3 following) c11, +count(id) over (order by id rows between 2 following and 3 following) c12, +count(id) over (order by id rows between 3 following and 3 following) c13 +from functional.alltypes where id < 8 +---- RESULTS: VERIFY_IS_EQUAL_SORTED +0,0,0,0,1,2,3,4,4,4,4,3,2,1 +1,0,0,1,2,3,4,5,5,5,4,3,2,1 +2,0,1,2,3,4,5,6,6,5,4,3,2,1 +3,1,2,3,4,5,6,7,6,5,4,3,2,1 +4,1,2,3,4,5,6,7,6,5,4,3,2,1 +5,1,2,3,4,5,6,6,5,4,3,2,1,0 +6,1,2,3,4,5,5,5,4,3,2,1,0,0 +7,1,2,3,4,4,4,4,3,2,1,0,0,0 +---- TYPES +INT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT, BIGINT
