This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4289fa1a166de6a77d4a41e7387340b18f9fc08c Author: Steve Carlin <[email protected]> AuthorDate: Fri Oct 4 11:12:48 2024 -0700 IMPALA-13197: (part 2) Added Analytic Expressions to Calcite Planner This commit contains fixes on top of the analytic expressions which fixes some of the tests in analytic-fns.test. The fixes include: - The AnalyzedAnalyticExpr object now calls "standardize" on AnalyticExpr which mutates AnalyticExpr into its final compiled form. - Added handling for sum_init_zero which is produced by Calcite. Note: this is only supported in Impala for BIGINT. An implementation is needed for Decimal and double (IMPALA-13435) - CastExpr needs to be analyzed. There is a quirk in the current Impala implementation that the parameters for CastExpr are not re-analyzed. So an explicit analyze is done when a CastExpr is encountered. - AnalyticExprs allow "count" with zero parameters - Certain analytic expressions use default window functions. The Calcite window operations will be ignored for these functions. Change-Id: I56529b13c545cdc9f96dd1c3bea9ef676e8c2755 Reviewed-on: http://gerrit.cloudera.org:8080/21897 Reviewed-by: Michael Smith <[email protected]> Tested-by: Michael Smith <[email protected]> --- .../calcite/functions/AnalyzedAnalyticExpr.java | 22 +++++++++- .../functions/AnalyzedFunctionCallExpr.java | 49 ++++++++++++++++++++++ .../impala/calcite/functions/FunctionResolver.java | 33 +++++++++++++-- .../impala/calcite/functions/RexCallConverter.java | 14 +++++-- .../operators/ImpalaCustomOperatorTable.java | 5 +++ .../impala/calcite/rel/node/ImpalaAnalyticRel.java | 30 +++++++++---- .../impala/calcite/rel/util/CreateExprVisitor.java | 6 ++- 7 files changed, 143 insertions(+), 16 deletions(-) diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java index d4b45d6b4..b9b452eb0 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedAnalyticExpr.java @@ -18,6 +18,7 @@ package org.apache.impala.calcite.functions; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.impala.analysis.AnalyticExpr; import org.apache.impala.analysis.AnalyticWindow; import org.apache.impala.analysis.Analyzer; @@ -50,6 +51,25 @@ public class AnalyzedAnalyticExpr extends AnalyticExpr { @Override protected void analyzeImpl(Analyzer analyzer) throws AnalysisException { - // TODO: Will need to call standardize for some of the analytic functions. + // Analytic functions need to be standardized into canonical forms that are + // supported by Impala. For example, LAG(c1) is standardized to + // LAG(c1, 1, NULL). + FunctionCallExpr origFuncExpr = getFnCall(); + this.standardize(analyzer); + // If the function expr has changed, make relevant adjustments + if (getFnCall() != origFuncExpr) { + // need to "setChildren" if expression was standardized + setChildren(); + if (isOffsetFn(getFnCall().getFn())) { + try { + // Since standardization may change the function signature, we need to find + // the new matching function in the function registry + Preconditions.checkArgument(getFnCall() instanceof AnalyzedFunctionCallExpr); + ((AnalyzedFunctionCallExpr) getFnCall()).resetAnalyticOffsetFn(); + } catch (ImpalaException e) { + throw new AnalysisException(e); + } + } + } } } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedFunctionCallExpr.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedFunctionCallExpr.java index c9031ec60..0dd0e0157 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedFunctionCallExpr.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/AnalyzedFunctionCallExpr.java @@ -17,14 +17,19 @@ package org.apache.impala.calcite.functions; +import com.google.common.base.Preconditions; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.FunctionParams; +import org.apache.impala.calcite.type.ImpalaTypeConverter; +import org.apache.impala.catalog.AggregateFunction; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; +import java.util.Arrays; import java.util.List; /** @@ -90,4 +95,48 @@ public class AnalyzedFunctionCallExpr extends FunctionCallExpr { this.type_); } + /** + * Resets lag and lead function after it has been standardized by the + * AnalyzedAnalyticExpr wrapper. Because of the current architecture, + * this fix is a bit hacky as we are mutating the object. Also, this + * is a bit specific to analytic expressions and does not belong within + * the general FunctionCallExpr, but this is the way analytic expressions + * currently work. + */ + public void resetAnalyticOffsetFn() throws ImpalaException { + if (fn_ instanceof AggregateFunction) { + // since the function lookup is based on exact match of data types (unlike Impala's + // builtin db which does allow matches based on implicit cast), we need to adjust + // one or more operands based on the function type + List<Type> operandTypes = Arrays.asList(collectChildReturnTypes()); + String funcName = getFnName().getFunction().toLowerCase(); + switch (funcName) { + case "lag": + case "lead": + // at this point the function should have been standardized into + // a 3 operand function and the second operand is the 'offset' which + // should be an integer (tinyint/smallint/int/bigint) type + Preconditions.checkArgument(operandTypes.size() == 3 && + operandTypes.get(1).isIntegerType()); + // upcast the second argument (offset) since it must always be BIGINT + if (operandTypes.get(1) != Type.BIGINT) { + operandTypes.set(1, Type.BIGINT); + uncheckedCastChild(Type.BIGINT, 1); + } + // Last argument could be NULL with TYPE_NULL but since Impala BE expects + // a concrete type, we cast it to the type of the first argument + if (operandTypes.get(2) == Type.NULL) { + Preconditions.checkArgument(operandTypes.get(0) != Type.NULL); + operandTypes.set(2, operandTypes.get(0)); + uncheckedCastChild(operandTypes.get(0), 2); + } + fn_ = FunctionResolver.getExactFunction(getFnName().getFunction(), + ImpalaTypeConverter.createRelDataTypes(operandTypes)); + break; + default: + throw new AnalysisException("Unsupported aggregate function."); + } + } + } + } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java index 6f776b903..014107630 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/FunctionResolver.java @@ -25,6 +25,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.impala.analysis.FunctionName; import org.apache.impala.calcite.type.ImpalaTypeConverter; import org.apache.impala.catalog.BuiltinsDb; @@ -64,6 +65,7 @@ public class FunctionResolver { .put(SqlKind.MINUS, "subtract") .put(SqlKind.TIMES, "multiply") .put(SqlKind.DIVIDE, "divide") + .put(SqlKind.SUM0, "sum_init_zero") .build(); public static Set<SqlKind> ARITHMETIC_TYPES = @@ -109,12 +111,10 @@ public class FunctionResolver { private static Function getFunction(String name, SqlKind kind, List<RelDataType> argTypes, boolean exactMatch) { + // Some names in Calcite don't map exactly to their corresponding Impala // functions, so we get the right mapping from the HashMap table. - String mappedName = CALCITE_KIND_TO_IMPALA_FUNC.get(kind); - return mappedName == null - ? getFunction(name, argTypes, exactMatch) - : getFunction(mappedName, argTypes, exactMatch); + return getFunction(getMappedName(name, kind, argTypes), argTypes, exactMatch); } private static Function getFunction(String name, List<RelDataType> argTypes, @@ -139,6 +139,31 @@ public class FunctionResolver { return fn; } + /** + * For most Calcite operators, the function name within Calcite matches the + * Impala function name. This method handles the exceptions to that rule. + */ + private static String getMappedName(String name, SqlKind kind, + List<RelDataType> argTypes) { + + // First check if any special mappings exist from Calcite SqlKinds to + // Impala functions. + String mappedName = CALCITE_KIND_TO_IMPALA_FUNC.get(kind); + if (mappedName != null) { + // IMPALA-13435: for sum_init_zero, there is support for BIGINT arguments, + // but not for DECIMAL or FLOAT. + if (mappedName.equals("sum_init_zero")) { + if (!argTypes.get(0).getSqlTypeName().equals(SqlTypeName.BIGINT)) { + return "sum"; + } + } + return mappedName; + } + + // If reached here, use the function name as given, no special mapping needed. + return name.toLowerCase(); + } + private static List<Type> getArgTypes(String name, List<RelDataType> argTypes, boolean exactMatch) { // Case statement is special because the function signature only contains the diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexCallConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexCallConverter.java index 468abec5b..7bbf01a02 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexCallConverter.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/functions/RexCallConverter.java @@ -25,6 +25,7 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.CaseWhenClause; import org.apache.impala.analysis.CompoundPredicate; @@ -32,6 +33,7 @@ import org.apache.impala.analysis.Expr; import org.apache.impala.calcite.type.ImpalaTypeConverter; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.Type; +import org.apache.impala.common.ImpalaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,8 @@ public class RexCallConverter { /* * Returns the Impala Expr object for RexCallConverter. */ - public static Expr getExpr(RexCall rexCall, List<Expr> params, RexBuilder rexBuilder) { + public static Expr getExpr(RexCall rexCall, List<Expr> params, RexBuilder rexBuilder, + Analyzer analyzer) throws ImpalaException { // Some functions are known just based on their RexCall signature. switch (rexCall.getOperator().getKind()) { @@ -69,7 +72,7 @@ public class RexCallConverter { case AND: return createCompoundExpr(rexCall, params); case CAST: - return createCastExpr(rexCall, params); + return createCastExpr(rexCall, params, analyzer); } String funcName = rexCall.getOperator().getName().toLowerCase(); @@ -121,7 +124,8 @@ public class RexCallConverter { return null; } - private static Expr createCastExpr(RexCall call, List<Expr> params) { + private static Expr createCastExpr(RexCall call, List<Expr> params, Analyzer analyzer) + throws ImpalaException { Type impalaRetType = ImpalaTypeConverter.createImpalaType(call.getType()); if (params.get(0).getType() == Type.NULL) { return new AnalyzedNullLiteral(impalaRetType); @@ -132,6 +136,10 @@ public class RexCallConverter { return params.get(0); } + // Small hack: Most cast expressions have "isImplicit" set to true. If this + // is the case, then it blocks "analyze" from working through the cast. We + // need to analyze the expression before creating the cast around it. + params.get(0).analyze(analyzer); return new AnalyzedCastExpr(impalaRetType, params.get(0)); } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaCustomOperatorTable.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaCustomOperatorTable.java index db29c7c54..a3f39cf78 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaCustomOperatorTable.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaCustomOperatorTable.java @@ -20,10 +20,12 @@ package org.apache.impala.calcite.operators; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.ExplicitOperatorBinding; +import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlBinaryOperator; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator; +import org.apache.calcite.sql.fun.SqlCountAggFunction; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.SqlReturnTypeInference; @@ -195,6 +197,9 @@ public class ImpalaCustomOperatorTable extends ReflectiveSqlOperatorTable { null, OperandTypes.NUMERIC_NUMERIC); + public static final SqlAggFunction COUNT = + new SqlCountAggFunction("COUNT", OperandTypes.VARIADIC); + public static ImpalaCustomOperatorTable instance() { return INSTANCE.get(); } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java index c9bcbc3c0..f348c1299 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java @@ -218,7 +218,8 @@ public class ImpalaAnalyticRel extends Project // For offset functions like LEAD/LAG, we skip this and let Impala assign // the window frame as part of the AnalyticExpr's standardization. AnalyticWindow window = null; - if ((!partitionExprs.isEmpty() || !orderByElements.isEmpty())) { + if ((!partitionExprs.isEmpty() || !orderByElements.isEmpty()) && + !skipWindowGeneration(fn)) { Boundary lBoundary = getWindowBoundary(rexWindow.getLowerBound(), ctx, inputRel, visitor); Boundary rBoundary = getWindowBoundary(rexWindow.getUpperBound(), @@ -234,6 +235,15 @@ public class ImpalaAnalyticRel extends Project return retExpr; } + /** + * Skip window generation on certain functions. These functions explicitly + * set the window within AnalyticExpr.standardize + */ + private boolean skipWindowGeneration(Function fn) { + return fn.functionName().equals("lag") || fn.functionName().equals("lead") || + fn.functionName().equals("row_number"); + } + private Boundary getWindowBoundary(RexWindowBound wb, PlannerContext ctx, ImpalaPlanRel inputRel, CreateExprVisitor visitor) throws ImpalaException { // At this stage, Calcite should have filled in the bound @@ -262,8 +272,8 @@ public class ImpalaAnalyticRel extends Project private List<Expr> getOutputExprs(Map<RexNode, Expr> mapping, List<RexNode> projects, Analyzer analyzer) throws ImpalaException { - AnalyticRexVisitor visitor = - new AnalyticRexVisitor(mapping, getCluster().getRexBuilder()); + AnalyticRexVisitor visitor = new AnalyticRexVisitor(mapping, + getCluster().getRexBuilder(), analyzer); Map<Integer, Expr> projectExprs = new LinkedHashMap<>(); List<Expr> outputExprs = new ArrayList<>(); @@ -291,8 +301,7 @@ public class ImpalaAnalyticRel extends Project return result; } - private Function getFunction(RexOver exp) - throws ImpalaException { + private Function getFunction(RexOver exp) throws ImpalaException { RelDataType retType = exp.getType(); SqlAggFunction aggFunction = exp.getAggOperator(); List<RelDataType> operandTypes = Lists.newArrayList(); @@ -399,11 +408,14 @@ public class ImpalaAnalyticRel extends Project private final RexBuilder rexBuilder_; + private final Analyzer analyzer_; + public AnalyticRexVisitor(Map<RexNode, Expr> exprsMap, - RexBuilder rexBuilder) { + RexBuilder rexBuilder, Analyzer analyzer) { super(false); this.exprsMap_ = exprsMap; this.rexBuilder_ = rexBuilder; + this.analyzer_ = analyzer; } @Override @@ -412,7 +424,11 @@ public class ImpalaAnalyticRel extends Project for (RexNode operand : rexCall.getOperands()) { params.add(operand.accept(this)); } - return RexCallConverter.getExpr(rexCall, params, rexBuilder_); + try { + return RexCallConverter.getExpr(rexCall, params, rexBuilder_, analyzer_); + } catch (ImpalaException e) { + throw new RuntimeException(e); + } } @Override diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java index 64600f2b8..447a06056 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java @@ -74,7 +74,11 @@ public class CreateExprVisitor extends RexVisitorImpl<Expr> { for (RexNode operand : rexCall.getOperands()) { params.add(operand.accept(this)); } - return RexCallConverter.getExpr(rexCall, params, rexBuilder_); + try { + return RexCallConverter.getExpr(rexCall, params, rexBuilder_, analyzer_); + } catch (ImpalaException e) { + throw new RuntimeException(e); + } } @Override
