This is an automated email from the ASF dual-hosted git repository.
jmalkin pushed a commit to branch kll_functions
in repository https://gitbox.apache.org/repos/asf/datasketches-spark.git
The following commit(s) were added to refs/heads/kll_functions by this push:
new 089b526 Add to_string w/o parameters, add tests
089b526 is described below
commit 089b526a825fff4f5ab423eb563930709bca6089
Author: Jon <[email protected]>
AuthorDate: Thu Mar 6 21:49:31 2025 -0800
Add to_string w/o parameters, add tests
---
.../sql/datasketches/kll/KllFunctionRegistry.scala | 2 ++
.../kll/expressions/KllDoublesSketchGetK.scala | 4 +--
.../KllDoublesSketchGetNumRetained.scala | 5 +--
.../expressions/KllDoublesSketchGetPmfCdf.scala | 4 +--
.../KllDoublesSketchIsEstimationMode.scala | 4 +--
...tained.scala => KllDoublesSketchToString.scala} | 34 +++++++++++++-----
.../spark/sql/datasketches/kll/functions.scala | 10 ++++++
.../spark/sql/datasketches/kll/KllTest.scala | 42 ++++++++++++++++++++--
8 files changed, 84 insertions(+), 21 deletions(-)
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/KllFunctionRegistry.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/KllFunctionRegistry.scala
index fa7e3d7..3574adc 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/KllFunctionRegistry.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/KllFunctionRegistry.scala
@@ -28,6 +28,7 @@ import
org.apache.spark.sql.datasketches.kll.expressions.{KllDoublesSketchGetMin
KllDoublesSketchGetCdf,
KllDoublesSketchGetNumRetained,
KllDoublesSketchGetK,
+
KllDoublesSketchToString,
KllDoublesSketchIsEstimationMode}
object KllFunctionRegistry extends DatasketchesFunctionRegistry {
@@ -39,6 +40,7 @@ object KllFunctionRegistry extends
DatasketchesFunctionRegistry {
expression[KllDoublesSketchGetPmf]("kll_sketch_double_get_pmf"),
expression[KllDoublesSketchGetCdf]("kll_sketch_double_get_cdf"),
expression[KllDoublesSketchGetK]("kll_sketch_double_get_k"),
+ expression[KllDoublesSketchToString]("kll_sketch_double_to_string"),
expression[KllDoublesSketchGetNumRetained]("kll_sketch_double_get_num_retained"),
expression[KllDoublesSketchIsEstimationMode]("kll_sketch_double_is_estimation_mode"),
)
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetK.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetK.scala
index a944cf2..d607dec 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetK.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetK.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
UnaryExpression,
ExpectsInputTypes,
NullIntolerant}
-import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType}
+import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeBlock,
CodegenContext, ExprCode}
import org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType
import org.apache.datasketches.kll.KllDoublesSketch
@@ -54,7 +54,7 @@ case class KllDoublesSketchGetK(sketchExpr: Expression)
override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType)
- override def dataType: DataType = DoubleType
+ override def dataType: DataType = IntegerType
override def nullSafeEval(input: Any): Any = {
val bytes = input.asInstanceOf[Array[Byte]]
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
index 57df5e5..28b2a73 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
@@ -22,11 +22,12 @@ import
org.apache.spark.sql.catalyst.expressions.{Expression,
UnaryExpression,
ExpectsInputTypes,
NullIntolerant}
-import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType}
+import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeBlock,
CodegenContext, ExprCode}
import org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType
import org.apache.datasketches.kll.KllDoublesSketch
import org.apache.datasketches.memory.Memory
+import org.apache.spark.sql.types.IntegerType
@ExpressionDescription(
usage = """
@@ -54,7 +55,7 @@ case class KllDoublesSketchGetNumRetained(sketchExpr:
Expression)
override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType)
- override def dataType: DataType = DoubleType
+ override def dataType: DataType = IntegerType
override def nullSafeEval(input: Any): Any = {
val bytes = input.asInstanceOf[Array[Byte]]
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetPmfCdf.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetPmfCdf.scala
index a041365..4b9498e 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetPmfCdf.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetPmfCdf.scala
@@ -211,6 +211,4 @@ case class KllDoublesSketchGetPmfCdf(sketchExpr: Expression,
// getRank(quantile, QuantileSearchCriteria)
// getRanks(quantile[]), QuantileSearchCriteria)
// getNormalizedRankError(bool isPmf)
-// toString(bool, bool) -- already part of the wrapper
-// getK() ?
-// getNumRetained() ?
+// toString(bool, bool)
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchIsEstimationMode.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchIsEstimationMode.scala
index ee66824..c06e293 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchIsEstimationMode.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchIsEstimationMode.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
UnaryExpression,
ExpectsInputTypes,
NullIntolerant}
-import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType}
+import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeBlock,
CodegenContext, ExprCode}
import org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType
import org.apache.datasketches.kll.KllDoublesSketch
@@ -54,7 +54,7 @@ case class KllDoublesSketchIsEstimationMode(sketchExpr:
Expression)
override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType)
- override def dataType: DataType = DoubleType
+ override def dataType: DataType = BooleanType
override def nullSafeEval(input: Any): Any = {
val bytes = input.asInstanceOf[Array[Byte]]
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchToString.scala
similarity index 73%
copy from
src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
copy to
src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchToString.scala
index 57df5e5..5a14ef2 100644
---
a/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchGetNumRetained.scala
+++
b/src/main/scala/org/apache/spark/sql/datasketches/kll/expressions/KllDoublesSketchToString.scala
@@ -22,44 +22,60 @@ import
org.apache.spark.sql.catalyst.expressions.{Expression,
UnaryExpression,
ExpectsInputTypes,
NullIntolerant}
-import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType}
+import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeBlock,
CodegenContext, ExprCode}
import org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType
import org.apache.datasketches.kll.KllDoublesSketch
import org.apache.datasketches.memory.Memory
+import org.apache.spark.unsafe.types.UTF8String
@ExpressionDescription(
usage = """
- _FUNC_(expr) - Returns the number of items retained by the sketch given
the binary representation
+ _FUNC_(expr) - Returns a string with information about the sketch given
the binary representation
of a Datasketches KllDoublesSketch. """,
examples = """
Examples:
> SELECT _FUNC_(kll_sketch_agg(col)) FROM VALUES (1.0), (2.0), (3.0)
tab(col);
- 3
+ ### KLL sketch summary:
+ K : 200
+ min K : 200
+ M : 8
+ N : 3
+ Epsilon : 1.33%
+ Epsilon PMF : 1.65%
+ Empty : false
+ Estimation mode: false
+ Levels : 1
+ Sorted : false
+ Capacity items : 200
+ Retained items : 3
+ Min item : 1
+ Max item : 3
+ ### End sketch summary
"""
//group = "misc_funcs",
)
-case class KllDoublesSketchGetNumRetained(sketchExpr: Expression)
+case class KllDoublesSketchToString(sketchExpr: Expression)
extends UnaryExpression
with ExpectsInputTypes
with NullIntolerant {
override def child: Expression = sketchExpr
- override protected def withNewChildInternal(newChild: Expression):
KllDoublesSketchGetNumRetained = {
+ override protected def withNewChildInternal(newChild: Expression):
KllDoublesSketchToString = {
copy(sketchExpr = newChild)
}
- override def prettyName: String = "kll_sketch_double_get_num_retained"
+ override def prettyName: String = "kll_sketch_double_to_string"
override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType)
- override def dataType: DataType = DoubleType
+ override def dataType: DataType = StringType
override def nullSafeEval(input: Any): Any = {
val bytes = input.asInstanceOf[Array[Byte]]
val sketch = KllDoublesSketch.wrap(Memory.wrap(bytes))
- sketch.getNumRetained()
+ UTF8String.fromString(sketch.toString())
}
override protected def nullSafeCodeGen(ctx: CodegenContext, ev: ExprCode, f:
String => String): ExprCode = {
@@ -70,7 +86,7 @@ case class KllDoublesSketchGetNumRetained(sketchExpr:
Expression)
s"""
|${sketchEval.code}
|final org.apache.datasketches.kll.KllDoublesSketch $sketch =
org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType.wrap(${sketchEval.value});
- |final int ${ev.value} = $sketch.getNumRetained();
+ |final org.apache.spark.unsafe.types.UTF8String ${ev.value} =
org.apache.spark.unsafe.types.UTF8String.fromString($sketch.toString());
|final boolean ${ev.isNull} = ${sketchEval.isNull};
""".stripMargin
ev.copy(code = CodeBlock(Seq(code), Seq.empty))
diff --git
a/src/main/scala/org/apache/spark/sql/datasketches/kll/functions.scala
b/src/main/scala/org/apache/spark/sql/datasketches/kll/functions.scala
index 9bebe3c..b13233f 100644
--- a/src/main/scala/org/apache/spark/sql/datasketches/kll/functions.scala
+++ b/src/main/scala/org/apache/spark/sql/datasketches/kll/functions.scala
@@ -29,6 +29,7 @@ import
org.apache.spark.sql.datasketches.kll.expressions.{KllDoublesSketchGetMin
KllDoublesSketchGetPmfCdf,
KllDoublesSketchGetNumRetained,
KllDoublesSketchGetK,
+
KllDoublesSketchToString,
KllDoublesSketchIsEstimationMode}
object functions extends DatasketchesScalaFunctionBase {
@@ -102,6 +103,15 @@ object functions extends DatasketchesScalaFunctionBase {
kll_sketch_double_is_estimation_mode(Column(columnName))
}
+ // to_string
+ def kll_sketch_double_to_string(expr: Column): Column = withExpr {
+ new KllDoublesSketchToString(expr.expr)
+ }
+
+ def kll_sketch_double_to_string(columnName: String): Column = {
+ kll_sketch_double_to_string(Column(columnName))
+ }
+
// get min
def kll_sketch_double_get_min(expr: Column): Column = withExpr {
new KllDoublesSketchGetMin(expr.expr)
diff --git a/src/test/scala/org/apache/spark/sql/datasketches/kll/KllTest.scala
b/src/test/scala/org/apache/spark/sql/datasketches/kll/KllTest.scala
index 0a17a9d..9f5fec4 100644
--- a/src/test/scala/org/apache/spark/sql/datasketches/kll/KllTest.scala
+++ b/src/test/scala/org/apache/spark/sql/datasketches/kll/KllTest.scala
@@ -26,6 +26,7 @@ import org.apache.datasketches.kll.KllDoublesSketch
import org.apache.spark.sql.datasketches.kll.functions._
import org.apache.spark.sql.datasketches.kll.types.KllDoublesSketchType
import org.apache.spark.sql.datasketches.common.SparkSessionManager
+import org.apache.datasketches.kll.KllSketch
class KllTest extends SparkSessionManager {
import spark.implicits._
@@ -116,6 +117,19 @@ class KllTest extends SparkSessionManager {
val cdf_excl = Array[Double](0.2, 0.49, 1.0, 1.0)
compareArrays(cdf_excl,
pmfCdfResult.getAs[Seq[Double]]("cdf_exclusive").toArray)
+
+ // re-use sketchDf to avoid overloading a single example with too many
results
+ val sketchInfo = sketchDf.select(
+ kll_sketch_double_get_k($"sketch").as("k"),
+ kll_sketch_double_is_estimation_mode($"sketch").as("estimation_mode"),
+ kll_sketch_double_get_num_retained($"sketch").as("num_retained"),
+ kll_sketch_double_to_string($"sketch").as("description")
+ ).head()
+
+ assert(sketchInfo.getAs[Integer]("k") == KllSketch.DEFAULT_K)
+ assert(sketchInfo.getAs[Boolean]("estimation_mode") == false)
+ assert(sketchInfo.getAs[Integer]("num_retained") == n)
+ assert(sketchInfo.getAs[String]("description").trim().startsWith("###
Kll"))
}
test("Kll Doubles Sketch via SQL") {
@@ -123,14 +137,15 @@ class KllTest extends SparkSessionManager {
KllFunctionRegistry.registerFunctions(spark)
val n = 100
+ val k = 200
val data = (for (i <- 1 to n) yield i.toDouble).toDF("value")
data.createOrReplaceTempView("data_table")
val kllDf = spark.sql(
s"""
|SELECT
- | kll_sketch_double_get_min(kll_sketch_double_agg_build(value, 200)) AS
min,
- | kll_sketch_double_get_max(kll_sketch_double_agg_build(value, 200)) AS
max
+ | kll_sketch_double_get_min(kll_sketch_double_agg_build(value, ${k}))
AS min,
+ | kll_sketch_double_get_max(kll_sketch_double_agg_build(value, ${k}))
AS max
|FROM
| data_table
""".stripMargin
@@ -150,7 +165,7 @@ class KllTest extends SparkSessionManager {
| kll_sketch_double_get_cdf(t.sketch, ${splitPoints}, false) AS
cdf_exclusive
|FROM
| (SELECT
- | kll_sketch_double_agg_build(value, 200) sketch
+ | kll_sketch_double_agg_build(value, ${k}) sketch
| FROM
| data_table) t
""".stripMargin
@@ -167,6 +182,27 @@ class KllTest extends SparkSessionManager {
val cdf_excl = Array[Double](0.2, 0.49, 1.0, 1.0)
compareArrays(cdf_excl,
pmfCdfResult.getAs[Seq[Double]]("cdf_exclusive").toArray)
+
+ // different query to avoid overloading a single example with too many
results
+ val sketchInfo = spark.sql(
+ s"""
+ SELECT
+ kll_sketch_double_get_k(sketch) AS k,
+ kll_sketch_double_is_estimation_mode(sketch) AS estimation_mode,
+ kll_sketch_double_get_num_retained(sketch) AS num_retained,
+ kll_sketch_double_to_string(sketch) AS description
+ FROM
+ (SELECT
+ kll_sketch_double_agg_build(value, ${k}) sketch
+ FROM
+ data_table) t
+ """
+ ).head()
+
+ assert(sketchInfo.getAs[Integer]("k") == KllSketch.DEFAULT_K)
+ assert(sketchInfo.getAs[Boolean]("estimation_mode") == false)
+ assert(sketchInfo.getAs[Integer]("num_retained") == n)
+ assert(sketchInfo.getAs[String]("description").trim().startsWith("###
Kll"))
}
test("KLL Doubles Merge via Scala") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]