This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new bf02b8c KYLIN-5082,exactly match for percentile
bf02b8c is described below
commit bf02b8c2d33bfd27cc29d4ce8fd5faff92be9c77
Author: leocoder <[email protected]>
AuthorDate: Sat Sep 4 10:19:03 2021 +0800
KYLIN-5082,exactly match for percentile
---
.../measure/percentile/PercentileCounter.java | 7 ++++
.../measure/percentile/PercentileSerializer.java | 4 +++
.../resources/query/sql_exactly_agg/query11.sql | 21 ++++++++++++
.../org/apache/spark/sql/KylinFunctions.scala | 8 ++---
.../catalyst/expressions/KylinExpresssions.scala | 28 ++++++++++++++++
.../sql/catalyst/expressions/ExpressionUtils.scala | 13 ++++++--
.../kylin/query/runtime/plans/AggregatePlan.scala | 37 ++++++++++++++--------
7 files changed, 98 insertions(+), 20 deletions(-)
diff --git
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
index 33433dc..a6115ea 100644
---
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
+++
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
@@ -64,6 +64,13 @@ public class PercentileCounter implements Serializable {
return registers.quantile(quantileRatio);
}
+ public Double getResultEstimateWithQuantileRatio(double quantileRatio) {
+ if (registers.size() == 0) {
+ return null;
+ }
+ return registers.quantile(quantileRatio);
+ }
+
public void writeRegisters(ByteBuffer out) {
registers.compress();
registers.asSmallBytes(out);
diff --git
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index d0ecba7..f83e2d5 100644
---
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -33,6 +33,10 @@ public class PercentileSerializer extends
DataTypeSerializer<PercentileCounter>
this.compression = type.getPrecision();
}
+ public PercentileSerializer(int precision) {
+ this.compression = precision;
+ }
+
@Override
public int peekLength(ByteBuffer in) {
return current().peekLength(in);
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql
b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql
new file mode 100644
index 0000000..1082d5a
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT percentile(price, 0.6) from test_kylin_fact
+group by LSTG_FORMAT_NAME, LSTG_SITE_ID, SLR_SEGMENT_CD
+;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":[14336],"exactlyMatched":[true]}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 63c90dc..00fd8db 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode,
BinaryExpression,
- DictEncode, Expression, ExpressionInfo, ExpressionUtils,
ImplicitCastInputTypes, In,
- KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase,
ScatterSkewData, SplitPart, Sum0,
- TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode,
BinaryExpression, DictEncode, Expression, ExpressionInfo, ExpressionUtils,
ImplicitCastInputTypes, In, KylinAddMonths, Like, Literal, PercentileDecode,
PreciseCountDistinctDecode, RoundBase, ScatterSkewData, SplitPart, Sum0,
TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount,
PreciseCountDistinct}
@@ -75,6 +72,9 @@ object KylinFunctions {
def approx_count_distinct_decode(column: Column, precision: Int): Column =
Column(ApproxCountDistinctDecode(column.expr, Literal(precision)))
+ def k_percentile_decode(column: Column, p: Column, precision: Int): Column =
+ Column(PercentileDecode(column.expr, p.expr, Literal(precision)))
+
def precise_count_distinct(column: Column): Column =
Column(PreciseCountDistinct(column.expr).toAggregateExpression())
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index b47ff90..b9a9112 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import com.esotericsoftware.kryo.io.{Input, KryoDataInput}
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
import org.apache.kylin.measure.hllc.HLLCounter
+import org.apache.kylin.measure.percentile.PercentileSerializer
import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
@@ -543,4 +544,31 @@ case class ScatterSkewData(left: Expression, right:
Expression) extends BinaryEx
override def dataType: DataType = StringType
override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType,
AnyDataType)
+}
+
+case class PercentileDecode(bytes: Expression, quantile: Expression,
precision: Expression) extends TernaryExpression with ExpectsInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType,
DecimalType, IntegerType)
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$")
+ defineCodeGen(ctx, ev, (bytes, quantile, precision) => {
+ s"""$expressionUtils.percentileDecodeHelper($bytes, $quantile,
$precision)"""
+ })
+ }
+
+ override protected def nullSafeEval(bytes: Any, quantile: Any, precision:
Any): Any = {
+ val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+ val serializer = new PercentileSerializer(precision.asInstanceOf[Int]);
+ val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
+
counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
+ }
+
+ override def dataType: DataType = DoubleType
+
+ override def prettyName: String = "percentile_decode"
+
+ override def nullable: Boolean = false
+
+ override def children: Seq[Expression] = Seq(bytes, quantile, precision)
}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index 232f6cc..31d157a 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -18,15 +18,17 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.kylin.measure.percentile.PercentileSerializer
import scala.util.{Failure, Success, Try}
import scala.reflect.ClassTag
import org.apache.spark.sql.AnalysisException
import
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder,
expressions}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.Decimal
+import java.nio.ByteBuffer
-object
-ExpressionUtils {
+object ExpressionUtils {
def expression[T <: Expression](name: String)
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) =
{
@@ -109,4 +111,11 @@ ExpressionUtils {
new ExpressionInfo(clazz.getCanonicalName, name)
}
}
+
+ def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any):
Double = {
+ val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+ val serializer = new PercentileSerializer(precision.asInstanceOf[Int]);
+ val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
+
counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
+ }
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
index 6f08e62..ebc1fad 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
@@ -59,23 +59,30 @@ object AggregatePlan extends LogEx {
// exactly match, skip agg, direct project.
val hash = System.identityHashCode(rel).toString
val aggCols = rel.getRewriteAggCalls.asScala.zipWithIndex.map {
- case (call: KylinAggregateCall, index: Int)
- if OLAPAggregateRel.getAggrFuncName(call).equals("COUNT_DISTINCT") =>
+ case (call: KylinAggregateCall, index: Int) =>
val dataType = call.getFunc.getReturnDataType
val funcName = OLAPAggregateRel.getAggrFuncName(call)
val argNames =
call.getArgList.asScala.map(dataFrame.schema.names.apply(_))
val columnName = argNames.map(col)
val aggName = SchemaProcessor.replaceToAggravateSchemaName(index,
funcName, hash, argNames: _*)
- if (call.isHllCountDistinctFunc) {
- KylinFunctions
- .approx_count_distinct_decode(columnName.head,
dataType.getPrecision)
- .alias(aggName)
- } else if (call.isBitmapCountDistinctFunc) {
- // execute count distinct precisely
-
KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName)
- } else {
- throw new IllegalArgumentException(
- s"""Unsupported function name $funcName""")
+ funcName match {
+ case FunctionDesc.FUNC_COUNT_DISTINCT =>
+ if (call.isHllCountDistinctFunc) {
+ KylinFunctions
+ .approx_count_distinct_decode(columnName.head,
dataType.getPrecision)
+ .alias(aggName)
+ } else if (call.isBitmapCountDistinctFunc) {
+ // execute count distinct precisely
+
KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName)
+ } else {
+ throw new IllegalArgumentException(
+ s"""Unsupported function name $funcName""")
+ }
+ case FunctionDesc.FUNC_PERCENTILE =>
+ val aggName =
SchemaProcessor.replaceToAggravateSchemaName(index, "PERCENTILE_DECODE", hash,
argNames: _*)
+ KylinFunctions.k_percentile_decode(columnName.head,
columnName(1), dataType.getPrecision).alias(aggName)
+ case _ =>
+ col(schemaNames.apply(call.getArgList.get(0)))
}
case (call: Any, index: Int) =>
col(schemaNames.apply(call.getArgList.get(0)))
@@ -223,7 +230,7 @@ object AggregatePlan extends LogEx {
.isSum0
}
- val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX",
"COUNT_DISTINCT")
+ val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX",
"COUNT_DISTINCT", "PERCENTILE", "PERCENTILE_APPROX")
def isExactlyCuboidMatched(rel: OLAPAggregateRel, groupByList:
List[Column]): Boolean = {
val olapContext = rel.getContext
@@ -241,7 +248,9 @@ object AggregatePlan extends LogEx {
return false
}
// when using intersect_count and intersect_value
- if (call.getArgList.size() > 1) return false
+ if (call.getArgList.size() > 1 &&
!OLAPAggregateRel.getAggrFuncName(call).startsWith("PERCENTILE")) {
+ return false
+ }
}
val groupByCols = rel.getGroups.asScala.map(_.getIdentity).toSet
if (groupByCols.isEmpty) return false