This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new f19f328 KYLIN-4760 Optimize TopN measure
f19f328 is described below
commit f19f3286a5a0122c313faa2bf3c5248aae4c726f
Author: rupengwang <[email protected]>
AuthorDate: Wed Sep 16 13:51:10 2020 +0800
KYLIN-4760 Optimize TopN measure
---
.../spark/sql/udaf/NullSafeValueSerializer.scala | 169 ++++++++++++++++
.../scala/org/apache/spark/sql/udaf/TopN.scala | 215 +++++++++++++++++++++
.../kylin/engine/spark/job/CuboidAggregator.scala | 21 +-
3 files changed, 402 insertions(+), 3 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
new file mode 100644
index 0000000..5be5626
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.udaf
+
+import java.io.{DataInput, DataOutput}
+import java.nio.charset.StandardCharsets
+
+import org.apache.spark.unsafe.types.UTF8String
+
+@SerialVersionUID(1)
+sealed trait NullSafeValueSerializer {
+ final def serialize(output: DataOutput, value: Any): Unit = {
+ if (value == null) {
+ output.writeInt(0)
+ } else {
+ serialize0(output, value.asInstanceOf[Any])
+ }
+ }
+
+ @inline protected def serialize0(output: DataOutput, value: Any): Unit
+
+ def deserialize(input: DataInput): Any = {
+ val length = input.readInt()
+ if (length == 0) {
+ null
+ } else {
+ deSerialize0(input, length)
+ }
+ }
+
+ @inline protected def deSerialize0(input: DataInput, length: Int): Any
+}
+
+@SerialVersionUID(1)
+class BooleanSerializer extends NullSafeValueSerializer {
+ override protected def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(1)
+ output.writeBoolean(value.asInstanceOf[Boolean])
+ }
+
+ override protected def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readBoolean()
+ }
+}
+
+@SerialVersionUID(1)
+class ByteSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(1)
+ output.writeByte(value.asInstanceOf[Byte])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readByte()
+ }
+}
+
+@SerialVersionUID(1)
+class ShortSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(2)
+ output.writeShort(value.asInstanceOf[Short])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readShort()
+ }
+}
+
+@SerialVersionUID(1)
+class IntegerSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(4)
+ output.writeInt(value.asInstanceOf[Int])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readInt()
+ }
+}
+
+@SerialVersionUID(1)
+class FloatSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(4)
+ output.writeFloat(value.asInstanceOf[Float])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readFloat()
+ }
+}
+
+@SerialVersionUID(1)
+class DoubleSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(8)
+ output.writeDouble(value.asInstanceOf[Double])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readDouble()
+ }
+}
+
+@SerialVersionUID(1)
+class LongSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ output.writeInt(8)
+ output.writeLong(value.asInstanceOf[Long])
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ input.readLong()
+ }
+}
+
+@SerialVersionUID(1)
+class StringSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ val bytes = value.toString.getBytes(StandardCharsets.UTF_8)
+ output.writeInt(bytes.length)
+ output.write(bytes)
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ val bytes = new Array[Byte](length)
+ input.readFully(bytes)
+ UTF8String.fromBytes(bytes)
+ }
+}
+
+@SerialVersionUID(1)
+class DecimalSerializer extends NullSafeValueSerializer {
+ override def serialize0(output: DataOutput, value: Any): Unit = {
+ val decimal = value.asInstanceOf[BigDecimal]
+ val bytes = decimal.toString().getBytes(StandardCharsets.UTF_8)
+ output.writeInt(1 + bytes.length)
+ output.writeByte(decimal.scale)
+ output.writeInt(bytes.length)
+ output.write(bytes)
+ }
+
+ override def deSerialize0(input: DataInput, length: Int): Any = {
+ val scale = input.readByte()
+ val length = input.readInt()
+ val bytes = new Array[Byte](length)
+ input.readFully(bytes)
+ val decimal = BigDecimal.apply(new String(bytes, StandardCharsets.UTF_8))
+ decimal.setScale(scale)
+ }
+}
+
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
new file mode 100644
index 0000000..ad1713f
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.udaf
+
+import com.esotericsoftware.kryo.KryoException
+import com.esotericsoftware.kryo.io.{Input, KryoDataInput, KryoDataOutput,
Output}
+import org.apache.kylin.measure.topn.TopNCounter
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate,
TypedImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+
+@SerialVersionUID(1)
+sealed abstract class BaseTopN(precision: Int,
+ internalSchema: StructType,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0
+ ) extends
TypedImperativeAggregate[TopNCounter[Seq[Any]]] with Serializable with Logging {
+ lazy val serializers: Seq[NullSafeValueSerializer] =
+ Seq(new DoubleSerializer) ++ internalSchema.drop(1).map(_.dataType).map {
+ case BooleanType => new BooleanSerializer
+ case ByteType => new ByteSerializer
+ case ShortType => new ShortSerializer
+ case IntegerType => new IntegerSerializer
+ case FloatType => new FloatSerializer
+ case LongType => new LongSerializer
+ case DoubleType => new DoubleSerializer
+ case TimestampType => new LongSerializer
+ case DateType => new IntegerSerializer
+ case StringType => new StringSerializer
+ case dt => throw new UnsupportedOperationException("Unsupported TopN
dimension type: " + dt)
+ }
+
+ override def createAggregationBuffer(): TopNCounter[Seq[Any]] = new
TopNCounter[Seq[Any]](precision * TopNCounter.EXTRA_SPACE_RATE)
+
+ override def merge(buffer: TopNCounter[Seq[Any]], input:
TopNCounter[Seq[Any]]): TopNCounter[Seq[Any]] = {
+ input.getCounterList.asScala.foreach { c =>
+ buffer.offer(c.getItem, c.getCount)
+ }
+ buffer
+ }
+
+ override def eval(buffer: TopNCounter[Seq[Any]]): Any = {
+ buffer.sortAndRetain()
+ val seq = buffer.getCounterList.asScala.map(
+ entry => InternalRow(entry.getCount, InternalRow(entry.getItem: _*))
+ )
+ ArrayData.toArrayData(seq)
+ }
+
+ var array: Array[Byte] = _
+ var output: Output = _
+
+ override def serialize(topNCounter: TopNCounter[Seq[Any]]): Array[Byte] = {
+ try {
+ if (topNCounter != null) {
+ if (array == null) {
+ array = new Array[Byte](1024 * 1024)
+ output = new Output(array)
+ }
+ output.clear()
+ val out = new KryoDataOutput(output)
+ topNCounter.sortAndRetain()
+ val counters = topNCounter.getCounterList
+ out.writeInt(counters.size())
+ counters.asScala.foreach { counter =>
+ val values = Seq(counter.getCount) ++ counter.getItem
+ values.zip(serializers).foreach { case (value, ser) =>
+ ser.serialize(out, value)
+ }
+ }
+ val i = output.position()
+ output.close()
+ array.slice(0, i)
+ } else {
+ Array.empty[Byte]
+ }
+ } catch {
+ case th: KryoException if th.getMessage.contains("Buffer overflow") =>
+ logWarning(s"Resize buffer size to ${array.length * 2}")
+ array = new Array[Byte](array.length * 2)
+ output.setBuffer(array)
+ serialize(topNCounter)
+ case th =>
+ throw th
+ }
+ }
+
+ override def deserialize(bytes: Array[Byte]): TopNCounter[Seq[Any]] = {
+ val topNCounter = new TopNCounter[Seq[Any]](precision *
TopNCounter.EXTRA_SPACE_RATE)
+ if (bytes.nonEmpty) {
+ val in = new KryoDataInput(new Input(bytes))
+ val size = in.readInt()
+ for (_ <- 0 until size) {
+ val values = serializers.map(_.deserialize(in))
+ val item = values.drop(1)
+ if (values.head == null) {
+ topNCounter.offer(item, null)
+ } else {
+ topNCounter.offer(item, values.head.asInstanceOf[Double])
+ }
+ }
+ }
+ topNCounter
+ }
+
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = ArrayType(
+ StructType(Seq(
+ StructField("measure", DoubleType),
+ StructField("dim", StructType(internalSchema.fields.drop(1)))
+ )))
+}
+
+case class ReuseTopN(
+ precision: Int,
+ internalSchema: StructType,
+ child: Expression,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends BaseTopN(precision, internalSchema, mutableAggBufferOffset,
inputAggBufferOffset) {
+ val dimType: StructType = StructType(internalSchema.fields.drop(1))
+ val innerType: StructType = StructType(Seq(
+ StructField("measure", DoubleType),
+ StructField("dim", dimType)
+ ))
+
+ override def update(buffer: TopNCounter[Seq[Any]], input: InternalRow):
TopNCounter[Seq[Any]] = {
+ val datum =
child.eval(input).asInstanceOf[ArrayData].toArray[UnsafeRow](innerType)
+ datum.foreach { data =>
+ val value = data.getDouble(0)
+ val dims = data.get(1, dimType).asInstanceOf[InternalRow]
+ val item = dimType.fields.map(_.dataType).zipWithIndex.map {
+ case (StringType, index) =>
+ val result = dims.get(index, StringType)
+ if (null != result) {
+ dims.getString(index)
+ } else {
+ result
+ }
+ case (dataType, index) =>
+ dims.get(index, dataType)
+ }.toSeq
+ buffer.offer(item, value)
+ }
+ buffer
+ }
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def children: Seq[Expression] = child :: Nil
+}
+
+case class EncodeTopN(
+ precision: Int,
+ internalSchema: StructType,
+ measure: Expression,
+ dimensions: Seq[Expression],
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends BaseTopN(precision, internalSchema, mutableAggBufferOffset,
inputAggBufferOffset) {
+ override def update(counter: TopNCounter[Seq[Any]], input: InternalRow):
TopNCounter[Seq[Any]] = {
+ val m = measure.eval(input)
+ val dims = dimensions.map {
+ case str: Expression if str.dataType.isInstanceOf[StringType] =>
+ val value = str.eval(input)
+ if (value != null) {
+ value.toString
+ } else {
+ null
+ }
+ case str =>
+ str.eval(input)
+ }
+ if (m == null) {
+ counter.offer(dims, null)
+ } else {
+ counter.offer(dims, m.toString.toDouble)
+ }
+ counter
+ }
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def children: Seq[Expression] = Seq(measure) ++ dimensions
+}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index 1da2af5..54afadf 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -98,9 +98,24 @@ object CuboidAggregator {
new Column(cdAggregate.toAggregateExpression()).as(id.toString)
}
case "TOP_N" =>
- val schema: StructType = constructTopNSchema(measure.pra)
- val udfName =
UdfManager.register(measure.returnType.toKylinDataType, measure.expression,
schema, !reuseLayout)
- callUDF(udfName, columns: _*).as(id.toString)
+ // Uses new TopN aggregate function
+ // located in
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
+ val schema = StructType(measure.pra.map { col =>
+ val dateType = col.dataType
+ if (col == measure) {
+ StructField(s"MEASURE_${col.columnName}", dateType)
+ } else {
+ StructField(s"DIMENSION_${col.columnName}", dateType)
+ }
+ })
+
+ if (reuseLayout) {
+ new Column(ReuseTopN(measure.returnType.precision, schema,
columns.head.expr)
+ .toAggregateExpression()).as(id.toString)
+ } else {
+ new Column(EncodeTopN(measure.returnType.precision, schema,
columns.head.expr, columns.drop(1).map(_.expr))
+ .toAggregateExpression()).as(id.toString)
+ }
case "PERCENTILE_APPROX" =>
val udfName =
UdfManager.register(measure.returnType.toKylinDataType, measure.expression,
null, !reuseLayout)
if (!reuseLayout) {