This is an automated email from the ASF dual-hosted git repository. jmalkin pushed a commit to branch restructure in repository https://gitbox.apache.org/repos/asf/datasketches-spark.git
commit 574ed072145453df17c0e5f404cc64d7ce7dfe91 Author: Jon Malkin <[email protected]> AuthorDate: Mon Jan 13 17:06:13 2025 -0800 Restructure repo as a proposal for future managability --- .../sql/{ => kll}/aggregate/KllAggregate.scala | 0 .../spark/sql/{ => kll}/aggregate/KllMerge.scala | 0 .../sql/{ => kll}/expressions/KllExpressions.scala | 0 .../sql/{ => kll}/types/KllDoublesSketchType.scala | 0 .../registrar/DatasketchesFunctionRegistry.scala | 24 +------- .../DatasketchesScalaFunctionsBase.scala} | 24 +++----- .../KllFunctionRegistry.scala} | 30 +++++----- .../ThetaFunctionRegistry.scala} | 27 ++++----- .../functions_datasketches_kll.scala} | 59 +------------------- .../registrar/functions_datasketches_theta.scala | 65 ++++++++++++++++++++++ .../{ => theta}/aggregate/ThetaSketchBuild.scala | 0 .../sql/{ => theta}/aggregate/ThetaUnion.scala | 0 .../{ => theta}/expressions/ThetaExpressions.scala | 6 +- .../sql/{ => theta}/types/ThetaSketchType.scala | 0 .../sql/{ => theta}/types/ThetaSketchWrapper.scala | 0 src/test/scala/org/apache/spark/sql/KllTest.scala | 12 ++-- .../scala/org/apache/spark/sql/ThetaTest.scala | 8 +-- 17 files changed, 115 insertions(+), 140 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/aggregate/KllAggregate.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/aggregate/KllAggregate.scala rename to src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala diff --git a/src/main/scala/org/apache/spark/sql/aggregate/KllMerge.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/aggregate/KllMerge.scala rename to src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala diff --git a/src/main/scala/org/apache/spark/sql/expressions/KllExpressions.scala b/src/main/scala/org/apache/spark/sql/kll/expressions/KllExpressions.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/expressions/KllExpressions.scala rename to src/main/scala/org/apache/spark/sql/kll/expressions/KllExpressions.scala diff --git a/src/main/scala/org/apache/spark/sql/types/KllDoublesSketchType.scala b/src/main/scala/org/apache/spark/sql/kll/types/KllDoublesSketchType.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/types/KllDoublesSketchType.scala rename to src/main/scala/org/apache/spark/sql/kll/types/KllDoublesSketchType.scala diff --git a/src/main/scala/org/apache/spark/sql/registrar/DatasketchesFunctionRegistry.scala b/src/main/scala/org/apache/spark/sql/registrar/DatasketchesFunctionRegistry.scala index 3dd84ed..86095d8 100644 --- a/src/main/scala/org/apache/spark/sql/registrar/DatasketchesFunctionRegistry.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/DatasketchesFunctionRegistry.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.registrar +package org.apache.spark.sql.registrar import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -25,12 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import scala.reflect.ClassTag -// DataSketches imports -import org.apache.spark.sql.aggregate.{KllDoublesSketchAgg, KllDoublesMergeAgg} -import org.apache.spark.sql.expressions.{KllGetMin, KllGetMax, KllGetPmf, KllGetCdf} -import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} -import org.apache.spark.sql.expressions.ThetaSketchGetEstimate - // based on org.apache.spark.sql.catalyst.FunctionRegistry trait DatasketchesFunctionRegistry { // override this to define the actual functions @@ -64,19 +58,3 @@ trait DatasketchesFunctionRegistry { (name, (expressionInfo, builder)) } } - -// defines the Map for the Datasketches functions -object DatasketchesFunctionRegistry extends DatasketchesFunctionRegistry { - override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( - expression[KllDoublesSketchAgg]("kll_sketch_agg"), - expression[KllDoublesMergeAgg]("kll_merge_agg"), - expression[KllGetMin]("kll_get_min"), - expression[KllGetMax]("kll_get_max"), - expression[KllGetPmf]("kll_get_pmf"), - expression[KllGetCdf]("kll_get_cdf"), - - expression[ThetaSketchBuild]("theta_sketch_build"), - expression[ThetaUnion]("theta_union"), - expression[ThetaSketchGetEstimate]("theta_sketch_get_estimate") - ) -} diff --git a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala b/src/main/scala/org/apache/spark/sql/registrar/DatasketchesScalaFunctionsBase.scala similarity index 57% copy from src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala copy to src/main/scala/org/apache/spark/sql/registrar/DatasketchesScalaFunctionsBase.scala index e5a5e2c..f0ee23a 100644 --- a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/DatasketchesScalaFunctionsBase.scala @@ -15,23 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql.types +package org.apache.spark.sql -class ThetaSketchType extends UserDefinedType[ThetaSketchWrapper] { - override def sqlType: DataType = DataTypes.BinaryType +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction - override def serialize(wrapper: ThetaSketchWrapper): Array[Byte] = { - wrapper.serialize - } +// this interfact provides a few helper methods defines and maps all the variants of each function invocation, analagous +// to the functions object in core Spark's org.apache.spark.sql.functions +trait DatasketchesScalaFunctionBase { + protected def withExpr(expr: => Expression): Column = Column(expr) - override def deserialize(data: Any): ThetaSketchWrapper = { - val bytes = data.asInstanceOf[Array[Byte]] - ThetaSketchWrapper.deserialize(bytes) + protected def withAggregateFunction(func: AggregateFunction): Column = { + Column(func.toAggregateExpression()) } - - override def userClass: Class[ThetaSketchWrapper] = classOf[ThetaSketchWrapper] - - override def catalogString: String = "ThetaSketch" } - -case object ThetaSketchType extends ThetaSketchType diff --git a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala b/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala similarity index 50% copy from src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala copy to src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala index e5a5e2c..80411bf 100644 --- a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala @@ -15,23 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.types +package org.apache.spark.sql.registrar -class ThetaSketchType extends UserDefinedType[ThetaSketchWrapper] { - override def sqlType: DataType = DataTypes.BinaryType +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo} - override def serialize(wrapper: ThetaSketchWrapper): Array[Byte] = { - wrapper.serialize - } +import org.apache.spark.sql.aggregate.{KllDoublesSketchAgg, KllDoublesMergeAgg} +import org.apache.spark.sql.expressions.{KllGetMin, KllGetMax, KllGetPmf, KllGetCdf} - override def deserialize(data: Any): ThetaSketchWrapper = { - val bytes = data.asInstanceOf[Array[Byte]] - ThetaSketchWrapper.deserialize(bytes) - } - - override def userClass: Class[ThetaSketchWrapper] = classOf[ThetaSketchWrapper] - - override def catalogString: String = "ThetaSketch" +object KllFunctionRegistry extends DatasketchesFunctionRegistry { + override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( + expression[KllDoublesSketchAgg]("kll_sketch_agg"), + expression[KllDoublesMergeAgg]("kll_merge_agg"), + expression[KllGetMin]("kll_get_min"), + expression[KllGetMax]("kll_get_max"), + expression[KllGetPmf]("kll_get_pmf"), + expression[KllGetCdf]("kll_get_cdf"), + ) } - -case object ThetaSketchType extends ThetaSketchType diff --git a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala b/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala similarity index 55% copy from src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala copy to src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala index e5a5e2c..27fa41a 100644 --- a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala @@ -15,23 +15,18 @@ * limitations under the License. */ -package org.apache.spark.sql.types +package org.apache.spark.sql.registrar -class ThetaSketchType extends UserDefinedType[ThetaSketchWrapper] { - override def sqlType: DataType = DataTypes.BinaryType +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo} - override def serialize(wrapper: ThetaSketchWrapper): Array[Byte] = { - wrapper.serialize - } +import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} +import org.apache.spark.sql.expressions.ThetaSketchGetEstimate - override def deserialize(data: Any): ThetaSketchWrapper = { - val bytes = data.asInstanceOf[Array[Byte]] - ThetaSketchWrapper.deserialize(bytes) - } - - override def userClass: Class[ThetaSketchWrapper] = classOf[ThetaSketchWrapper] - - override def catalogString: String = "ThetaSketch" +object ThetaFunctionRegistry extends DatasketchesFunctionRegistry { + override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( + expression[ThetaSketchBuild]("theta_sketch_build"), + expression[ThetaUnion]("theta_union"), + expression[ThetaSketchGetEstimate]("theta_sketch_get_estimate") + ) } - -case object ThetaSketchType extends ThetaSketchType diff --git a/src/main/scala/org/apache/spark/sql/functions_ds.scala b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala similarity index 72% rename from src/main/scala/org/apache/spark/sql/functions_ds.scala rename to src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala index f8338b3..ea620d7 100644 --- a/src/main/scala/org/apache/spark/sql/functions_ds.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala @@ -17,25 +17,14 @@ package org.apache.spark.sql - -import org.apache.spark.sql.aggregate.{KllDoublesSketchAgg, KllDoublesMergeAgg} -import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.expressions._ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType} -// this class defines and maps all the variants of each function invocation, analagous -// to the functions object in org.apache.spark.sql.functions -object functions_ds { - - private def withExpr(expr: => Expression): Column = Column(expr) +import org.apache.spark.sql.aggregate.{KllDoublesMergeAgg, KllDoublesSketchAgg} +import org.apache.spark.sql.expressions.{KllGetMin, KllGetMax, KllGetPmfCdf} - private def withAggregateFunction(func: AggregateFunction): Column = { - Column(func.toAggregateExpression()) - } +object functions_datasketches_kll extends DatasketchesScalaFunctionBase { // get min def kll_get_min(expr: Column): Column = withExpr { @@ -163,46 +152,4 @@ object functions_ds { def kll_get_cdf(columnName: String, splitPoints: Array[Double]): Column = { kll_get_cdf(Column(columnName), splitPoints) } - - // Theta - - def theta_sketch_build(column: Column, lgk: Int): Column = withAggregateFunction { - new ThetaSketchBuild(column.expr, lgk) - } - - def theta_sketch_build(columnName: String, lgk: Int): Column = { - theta_sketch_build(Column(columnName), lgk) - } - - def theta_sketch_build(column: Column): Column = withAggregateFunction { - new ThetaSketchBuild(column.expr) - } - - def theta_sketch_build(columnName: String): Column = { - theta_sketch_build(Column(columnName)) - } - - def theta_union(column: Column, lgk: Int): Column = withAggregateFunction { - new ThetaUnion(column.expr, lit(lgk).expr) - } - - def theta_union(columnName: String, lgk: Int): Column = withAggregateFunction { - new ThetaUnion(Column(columnName).expr, lit(lgk).expr) - } - - def theta_union(column: Column): Column = withAggregateFunction { - new ThetaUnion(column.expr) - } - - def theta_union(columnName: String): Column = withAggregateFunction { - new ThetaUnion(Column(columnName).expr) - } - - def theta_sketch_get_estimate(column: Column): Column = withExpr { - new ThetaSketchGetEstimate(column.expr) - } - - def theta_sketch_get_estimate(columnName: String): Column = { - theta_sketch_get_estimate(Column(columnName)) - } } diff --git a/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala new file mode 100644 index 0000000..88dfc8f --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.functions.lit + +import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} +import org.apache.spark.sql.expressions.ThetaSketchGetEstimate + +object functions_datasketches_theta extends DatasketchesScalaFunctionBase { + def theta_sketch_build(column: Column, lgk: Int): Column = withAggregateFunction { + new ThetaSketchBuild(column.expr, lgk) + } + + def theta_sketch_build(columnName: String, lgk: Int): Column = { + theta_sketch_build(Column(columnName), lgk) + } + + def theta_sketch_build(column: Column): Column = withAggregateFunction { + new ThetaSketchBuild(column.expr) + } + + def theta_sketch_build(columnName: String): Column = { + theta_sketch_build(Column(columnName)) + } + + def theta_union(column: Column, lgk: Int): Column = withAggregateFunction { + new ThetaUnion(column.expr, lit(lgk).expr) + } + + def theta_union(columnName: String, lgk: Int): Column = withAggregateFunction { + new ThetaUnion(Column(columnName).expr, lit(lgk).expr) + } + + def theta_union(column: Column): Column = withAggregateFunction { + new ThetaUnion(column.expr) + } + + def theta_union(columnName: String): Column = withAggregateFunction { + new ThetaUnion(Column(columnName).expr) + } + + def theta_sketch_get_estimate(column: Column): Column = withExpr { + new ThetaSketchGetEstimate(column.expr) + } + + def theta_sketch_get_estimate(columnName: String): Column = { + theta_sketch_get_estimate(Column(columnName)) + } +} diff --git a/src/main/scala/org/apache/spark/sql/aggregate/ThetaSketchBuild.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchBuild.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/aggregate/ThetaSketchBuild.scala rename to src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchBuild.scala diff --git a/src/main/scala/org/apache/spark/sql/aggregate/ThetaUnion.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaUnion.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/aggregate/ThetaUnion.scala rename to src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaUnion.scala diff --git a/src/main/scala/org/apache/spark/sql/expressions/ThetaExpressions.scala b/src/main/scala/org/apache/spark/sql/theta/expressions/ThetaExpressions.scala similarity index 90% rename from src/main/scala/org/apache/spark/sql/expressions/ThetaExpressions.scala rename to src/main/scala/org/apache/spark/sql/theta/expressions/ThetaExpressions.scala index 37709a7..f30735c 100644 --- a/src/main/scala/org/apache/spark/sql/expressions/ThetaExpressions.scala +++ b/src/main/scala/org/apache/spark/sql/theta/expressions/ThetaExpressions.scala @@ -20,13 +20,11 @@ package org.apache.spark.sql.expressions import org.apache.datasketches.memory.Memory import org.apache.datasketches.theta.Sketch -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpectsInputTypes, UnaryExpression, BinaryExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpectsInputTypes, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.NullIntolerant import org.apache.spark.sql.catalyst.expressions.ExpressionDescription -import org.apache.spark.sql.catalyst.expressions.ImplicitCastInputTypes import org.apache.spark.sql.catalyst.expressions.codegen.{CodeBlock, CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.util.GenericArrayData -import org.apache.spark.sql.types.{AbstractDataType, DataType, ArrayType, DoubleType, ThetaSketchType} +import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType, ThetaSketchType} @ExpressionDescription( usage = """ diff --git a/src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala b/src/main/scala/org/apache/spark/sql/theta/types/ThetaSketchType.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/types/ThetaSketchType.scala rename to src/main/scala/org/apache/spark/sql/theta/types/ThetaSketchType.scala diff --git a/src/main/scala/org/apache/spark/sql/types/ThetaSketchWrapper.scala b/src/main/scala/org/apache/spark/sql/theta/types/ThetaSketchWrapper.scala similarity index 100% rename from src/main/scala/org/apache/spark/sql/types/ThetaSketchWrapper.scala rename to src/main/scala/org/apache/spark/sql/theta/types/ThetaSketchWrapper.scala diff --git a/src/test/scala/org/apache/spark/sql/KllTest.scala b/src/test/scala/org/apache/spark/sql/KllTest.scala index 1824ef8..54550e3 100644 --- a/src/test/scala/org/apache/spark/sql/KllTest.scala +++ b/src/test/scala/org/apache/spark/sql/KllTest.scala @@ -22,10 +22,10 @@ import org.apache.spark.sql.functions._ import scala.collection.mutable.WrappedArray import org.apache.spark.sql.types.{StructType, StructField, IntegerType, BinaryType} -import org.apache.spark.sql.functions_ds._ +import org.apache.spark.sql.functions_datasketches_kll._ import org.apache.datasketches.kll.KllDoublesSketch import org.apache.spark.sql.types.KllDoublesSketchType -import org.apache.spark.registrar.DatasketchesFunctionRegistry +import org.apache.spark.sql.registrar.KllFunctionRegistry class KllTest extends SparkSessionManager { import spark.implicits._ @@ -119,8 +119,8 @@ class KllTest extends SparkSessionManager { } test("Kll Doubles Sketch via SQL") { - // register Datasketches functions - DatasketchesFunctionRegistry.registerFunctions(spark) + // register KLL functions + KllFunctionRegistry.registerFunctions(spark) val n = 100 val data = (for (i <- 1 to n) yield i.toDouble).toDF("value") @@ -212,8 +212,8 @@ class KllTest extends SparkSessionManager { } test("KLL Doubles Merge via SQL") { - // register Datasketches functions - DatasketchesFunctionRegistry.registerFunctions(spark) + // register KLL functions + KllFunctionRegistry.registerFunctions(spark) val data = generateData().toDF() data.createOrReplaceTempView("data_table") diff --git a/src/test/scala/org/apache/spark/sql/ThetaTest.scala b/src/test/scala/org/apache/spark/sql/ThetaTest.scala index f7c207a..8a47658 100644 --- a/src/test/scala/org/apache/spark/sql/ThetaTest.scala +++ b/src/test/scala/org/apache/spark/sql/ThetaTest.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql -import org.apache.spark.sql.functions_ds._ -import org.apache.spark.registrar.DatasketchesFunctionRegistry +import org.apache.spark.sql.functions_datasketches_theta._ +import org.apache.spark.sql.registrar.ThetaFunctionRegistry class ThetaTest extends SparkSessionManager { import spark.implicits._ @@ -34,7 +34,7 @@ class ThetaTest extends SparkSessionManager { } test("Theta Sketch build via SQL default lgk") { - DatasketchesFunctionRegistry.registerFunctions(spark) + ThetaFunctionRegistry.registerFunctions(spark) val n = 100 val data = (for (i <- 1 to n) yield i).toDF("value") @@ -50,7 +50,7 @@ class ThetaTest extends SparkSessionManager { } test("Theta Sketch build via SQL with lgk") { - DatasketchesFunctionRegistry.registerFunctions(spark) + ThetaFunctionRegistry.registerFunctions(spark) val n = 100 val data = (for (i <- 1 to n) yield i).toDF("value") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
