[ 
https://issues.apache.org/jira/browse/HIVE-23462?focusedWorklogId=438469&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-438469
 ]

ASF GitHub Bot logged work on HIVE-23462:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/May/20 21:48
            Start Date: 28/May/20 21:48
    Worklog Time Spent: 10m 
      Work Description: jcamachor commented on a change in pull request #1031:
URL: https://github.com/apache/hive/pull/1031#discussion_r432106650



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -235,12 +232,21 @@ public String getFunctionName() {
         return Optional.empty();
       } else {
         JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(new 
HiveTypeSystemImpl());
+        Type type = returnType;

Review comment:
       Is this variable needed? It seems to me its same as `returnType`.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -235,12 +232,21 @@ public String getFunctionName() {
         return Optional.empty();
       } else {
         JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(new 
HiveTypeSystemImpl());
+        Type type = returnType;
+        if (type instanceof ParameterizedType) {
+          ParameterizedType parameterizedType = (ParameterizedType) type;
+          if (parameterizedType.getRawType() == List.class) {
+          final RelDataType componentRelType = 
typeFactory.createType(parameterizedType.getActualTypeArguments()[0]);

Review comment:
       nit. indentation

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+

Review comment:
       nit. newline

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -235,12 +232,21 @@ public String getFunctionName() {
         return Optional.empty();
       } else {
         JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(new 
HiveTypeSystemImpl());
+        Type type = returnType;
+        if (type instanceof ParameterizedType) {
+          ParameterizedType parameterizedType = (ParameterizedType) type;
+          if (parameterizedType.getRawType() == List.class) {
+          final RelDataType componentRelType = 
typeFactory.createType(parameterizedType.getActualTypeArguments()[0]);
+          return Optional
+              
.of(typeFactory.createArrayType(typeFactory.createTypeWithNullability(componentRelType,
 true), -1));
+          }
+        }
         return Optional.of(typeFactory.createType(returnType));
       }

Review comment:
       I have a general comment about the approach we are taking in this 
methods to infer the return type.
   I think we should rethink inferring the return type from the Java returned 
object from 'evaluate' and we could possibly take a step back.
   One option could be create the necessary `SqlReturnTypeInference` strategies 
to be able to return the correct type depending on the functions. If the 
inference is simple, we could hardcode some of those return types. This is the 
general approach taken by Calcite functions. I think that will help simplifying 
this code a lot.
   What do you think? Do you have any other ideas?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+

Review comment:
       nit. newline

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+
+        over.getOperands();
+        RexWindow w = over.getWindow();
+
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());

Review comment:
       Is this needed?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+
+        over.getOperands();
+        RexWindow w = over.getWindow();
+
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());
+        // the CDF function utilizes the '<' operator;
+        // negating the input will mirror the values on the x axis
+        // by using 1-CDF(-x) we could get a <= operator
+        RexNode key = orderKey.getKey();
+        key = rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, key);
+        key = rexBuilder.makeCast(getFloatType(), key);
+
+        ImmutableList<RexNode> projExprs = 
ImmutableList.<RexNode>builder().addAll(partitionKeys).add(key).build();
+        relBuilder.project(projExprs);
+        ImmutableBitSet groupSets = 
ImmutableBitSet.range(partitionKeys.size());
+
+        SqlAggFunction aggFunction = (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = Lists.newArrayList(partitionKeys.size());
+        int filterArg = -1;
+        RelCollation collation = RelCollations.EMPTY;
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, 
Collections.emptyList());
+        String name = aggFunction.getName();
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, 
approximate, ignoreNulls, argList, filterArg,
+                      collation, type, name);
+
+        RelNode agg = HiveRelFactories.HIVE_AGGREGATE_FACTORY.createAggregate(
+            relBuilder.build(),
+            groupSets, ImmutableList.of(groupSets),
+            Lists.newArrayList(newAgg));
+        relBuilder.push(agg);
+
+        List<RexNode> joinConditions;
+        joinConditions = Ord.zip(partitionKeys).stream().map(o -> {
+          RexNode f = relBuilder.field(2, 1, o.i);
+          return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, o.e, f);
+        }).collect(Collectors.toList());
+        relBuilder.join(JoinRelType.INNER, joinConditions);
+
+        int sketchFieldIndex = relBuilder.peek().getRowType().getFieldCount() 
- 1;
+        RexInputRef sketchInputRef = relBuilder.field(sketchFieldIndex);
+        SqlOperator projectOperator = 
getSqlOperator(DataSketchesFunctions.GET_CDF);
+
+        // NULLs will be replaced by this value - to be before / after the 
other values
+        // note: the sketch will ignore NULLs entirely but they will be placed 
at 0.0 or 1.0
+        final RexNode nullReplacement =
+            relBuilder.literal(orderKey.getNullDirection() == 
NullDirection.FIRST ? Float.MAX_VALUE : -Float.MAX_VALUE);
+
+        // long story short: CAST(1.0f-CDF(CAST(COALESCE(-X, nullReplacement) 
AS FLOAT))[0] AS targetType)
+        RexNode projRex = key;
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.COALESCE, key, 
nullReplacement);
+        projRex = rexBuilder.makeCast(getFloatType(), projRex);
+        projRex = rexBuilder.makeCall(projectOperator, 
ImmutableList.of(sketchInputRef, projRex));
+        projRex = makeItemCall(projRex, relBuilder.literal(0));
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.MINUS, 
relBuilder.literal(1.0f), projRex);
+        projRex = rexBuilder.makeCast(over.getType(), projRex);
+
+        return projRex;
+      }
+
+      private RexNode makeItemCall(RexNode arr, RexNode offset) {
+
+        if(getClass().desiredAssertionStatus()) {
+          try {
+            SqlKind.class.getField("ITEM");
+            throw new RuntimeException("bind SqlKind.ITEM instead of this 
workaround - C1.23 a02155a70a");

Review comment:
       ??

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
##########
@@ -672,6 +673,15 @@ public static SqlAggFunction getCalciteAggFn(String 
hiveUdfName, boolean isDisti
             udfInfo.operandTypeInference,
             udfInfo.operandTypeChecker);
         break;
+      case "cume_dist":

Review comment:
       Should this be removed?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -68,25 +82,33 @@
  *       ⇒ SELECT ds_kll_quantile(ds_kll_sketch(CAST(id AS FLOAT)), 0.2) FROM 
sketch_input;
  *    </pre>
  *  </li>
+ *  <li>{@code cume_dist() over (order by id)}
+ *    <pre>
+ *     SELECT id, CUME_DIST() OVER (ORDER BY id) FROM sketch_input;
+ *       ⇒ SELECT id, CUME_DIST() OVER (ORDER BY id),

Review comment:
       Probably you want to delete `CUME_DIST() OVER (ORDER BY id),` in L88, 
since we are providing the equivalent rewriting.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelBuilder.java
##########
@@ -157,6 +159,12 @@ public static SqlAggFunction getRollup(SqlAggFunction 
aggregation) {
     return null;
   }
 
+  @Override
+  public AggCall aggregateCall(SqlAggFunction aggFunction, boolean distinct, 
boolean approximate, boolean ignoreNulls,

Review comment:
       Why are we overriding this on Hive side? Could we leave a comment?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1

Review comment:
       We should add a new type to Calcite for this function too.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelBuilder.java
##########
@@ -157,6 +159,12 @@ public static SqlAggFunction getRollup(SqlAggFunction 
aggregation) {
     return null;
   }
 
+  @Override
+  public AggCall aggregateCall(SqlAggFunction aggFunction, boolean distinct, 
boolean approximate, boolean ignoreNulls,
+      RexNode filter, ImmutableList<RexNode> orderKeys, String alias, 
ImmutableList<RexNode> operands) {
+    return super.aggregateCall(aggFunction, distinct, approximate, 
ignoreNulls, filter, orderKeys, alias, operands);

Review comment:
       I did not know `AggregateCall` already had an `approximate` flag. Do you 
know what this is used for in Calcite?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));

Review comment:
       nit. Indentation

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -235,12 +232,21 @@ public String getFunctionName() {
         return Optional.empty();
       } else {
         JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(new 
HiveTypeSystemImpl());
+        Type type = returnType;
+        if (type instanceof ParameterizedType) {
+          ParameterizedType parameterizedType = (ParameterizedType) type;
+          if (parameterizedType.getRawType() == List.class) {
+          final RelDataType componentRelType = 
typeFactory.createType(parameterizedType.getActualTypeArguments()[0]);
+          return Optional
+              
.of(typeFactory.createArrayType(typeFactory.createTypeWithNullability(componentRelType,
 true), -1));
+          }

Review comment:
       What if it is a parameterized type but it is not a list? Should we throw 
an exception just to make sure?
   

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+
+        over.getOperands();
+        RexWindow w = over.getWindow();
+
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());
+        // the CDF function utilizes the '<' operator;
+        // negating the input will mirror the values on the x axis
+        // by using 1-CDF(-x) we could get a <= operator
+        RexNode key = orderKey.getKey();
+        key = rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, key);
+        key = rexBuilder.makeCast(getFloatType(), key);
+
+        ImmutableList<RexNode> projExprs = 
ImmutableList.<RexNode>builder().addAll(partitionKeys).add(key).build();
+        relBuilder.project(projExprs);
+        ImmutableBitSet groupSets = 
ImmutableBitSet.range(partitionKeys.size());
+
+        SqlAggFunction aggFunction = (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = Lists.newArrayList(partitionKeys.size());
+        int filterArg = -1;
+        RelCollation collation = RelCollations.EMPTY;
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, 
Collections.emptyList());
+        String name = aggFunction.getName();
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, 
approximate, ignoreNulls, argList, filterArg,
+                      collation, type, name);
+
+        RelNode agg = HiveRelFactories.HIVE_AGGREGATE_FACTORY.createAggregate(
+            relBuilder.build(),
+            groupSets, ImmutableList.of(groupSets),
+            Lists.newArrayList(newAgg));
+        relBuilder.push(agg);
+
+        List<RexNode> joinConditions;
+        joinConditions = Ord.zip(partitionKeys).stream().map(o -> {
+          RexNode f = relBuilder.field(2, 1, o.i);
+          return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, o.e, f);
+        }).collect(Collectors.toList());
+        relBuilder.join(JoinRelType.INNER, joinConditions);
+
+        int sketchFieldIndex = relBuilder.peek().getRowType().getFieldCount() 
- 1;
+        RexInputRef sketchInputRef = relBuilder.field(sketchFieldIndex);
+        SqlOperator projectOperator = 
getSqlOperator(DataSketchesFunctions.GET_CDF);
+
+        // NULLs will be replaced by this value - to be before / after the 
other values
+        // note: the sketch will ignore NULLs entirely but they will be placed 
at 0.0 or 1.0
+        final RexNode nullReplacement =
+            relBuilder.literal(orderKey.getNullDirection() == 
NullDirection.FIRST ? Float.MAX_VALUE : -Float.MAX_VALUE);
+
+        // long story short: CAST(1.0f-CDF(CAST(COALESCE(-X, nullReplacement) 
AS FLOAT))[0] AS targetType)
+        RexNode projRex = key;
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.COALESCE, key, 
nullReplacement);
+        projRex = rexBuilder.makeCast(getFloatType(), projRex);
+        projRex = rexBuilder.makeCall(projectOperator, 
ImmutableList.of(sketchInputRef, projRex));
+        projRex = makeItemCall(projRex, relBuilder.literal(0));
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.MINUS, 
relBuilder.literal(1.0f), projRex);
+        projRex = rexBuilder.makeCast(over.getType(), projRex);
+
+        return projRex;
+      }
+
+      private RexNode makeItemCall(RexNode arr, RexNode offset) {
+

Review comment:
       nit. newline

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {

Review comment:
       I think this check could be done more efficiently, avoiding traversing 
all the expressions.
   
   We need to make sure we only create a new project when there are rewritings 
in the RexNode expressions:
   1) In the `for` loop in `processProject`, check for each expression whether 
it was modified by ProcessShuttle (`==`).
   2) Only create a new Project operator if any of the expressions changed.
   3) Check here `newProject == project`.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +389,216 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject instanceof Project && ((Project) 
newProject).getChildExps().equals(project.getChildExps())) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected RelNode processProject(Project project) {
+        relBuilder.push(project.getInput());
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+                newProjects.add(expr.accept(shuttle));
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+
+        over.getOperands();
+        RexWindow w = over.getWindow();
+
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());
+        // the CDF function utilizes the '<' operator;
+        // negating the input will mirror the values on the x axis
+        // by using 1-CDF(-x) we could get a <= operator
+        RexNode key = orderKey.getKey();
+        key = rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, key);
+        key = rexBuilder.makeCast(getFloatType(), key);
+
+        ImmutableList<RexNode> projExprs = 
ImmutableList.<RexNode>builder().addAll(partitionKeys).add(key).build();
+        relBuilder.project(projExprs);
+        ImmutableBitSet groupSets = 
ImmutableBitSet.range(partitionKeys.size());
+
+        SqlAggFunction aggFunction = (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = Lists.newArrayList(partitionKeys.size());
+        int filterArg = -1;
+        RelCollation collation = RelCollations.EMPTY;
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, 
Collections.emptyList());
+        String name = aggFunction.getName();
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, 
approximate, ignoreNulls, argList, filterArg,
+                      collation, type, name);
+
+        RelNode agg = HiveRelFactories.HIVE_AGGREGATE_FACTORY.createAggregate(

Review comment:
       Maybe it would have been easier to rely on the agg function in the 
relBuilder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 438469)
    Time Spent: 0.5h  (was: 20m)

> Add option to rewrite NTILE to sketch functions
> -----------------------------------------------
>
>                 Key: HIVE-23462
>                 URL: https://issues.apache.org/jira/browse/HIVE-23462
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Zoltan Haindrich
>            Assignee: Zoltan Haindrich
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23462.01.patch, HIVE-23462.02.patch, 
> HIVE-23462.03.patch, HIVE-23462.04.patch
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to