dtenedor commented on code in PR #53548:
URL: https://github.com/apache/spark/pull/53548#discussion_r2674230528


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala:
##########
@@ -449,6 +449,350 @@ case class KllSketchAggDouble(
   }
 }
 
+/**
+ * The KllMergeAggBigint function merges multiple Apache DataSketches 
KllLongsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllLongsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllLongsSketch representations to merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllLongsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_bigint).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_bigint(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_bigint(col) as sketch FROM VALUES (1), (2), (3) tab(col) UNION 
ALL SELECT kll_sketch_agg_bigint(col) as sketch FROM VALUES (4), (5), (6) 
tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggBigint(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllLongsSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggBigint =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggBigint =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggBigint = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_bigint"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllLongsSketch = 
KllLongsSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllLongsSketch =
+    KllLongsSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllLongsSketch =
+    KllLongsSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllLongsSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * The KllMergeAggFloat function merges multiple Apache DataSketches 
KllFloatsSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllFloatsSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllFloatsSketch representations to 
merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllFloatsSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_float).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_float(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(1.0 AS FLOAT)), (CAST(2.0 
AS FLOAT)), (CAST(3.0 AS FLOAT)) tab(col) UNION ALL SELECT 
kll_sketch_agg_float(col) as sketch FROM VALUES (CAST(4.0 AS FLOAT)), (CAST(5.0 
AS FLOAT)), (CAST(6.0 AS FLOAT)) tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggFloat(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllFloatsSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggFloat =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggFloat =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggFloat = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_float"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllFloatsSketch = 
KllFloatsSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllFloatsSketch =
+    KllFloatsSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllFloatsSketch =
+    KllFloatsSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllFloatsSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * The KllMergeAggDouble function merges multiple Apache DataSketches 
KllDoublesSketch instances
+ * that have been serialized to binary format. This is useful for combining 
sketches created
+ * in separate aggregations (e.g., from different partitions or time windows).
+ * It outputs the merged binary representation of the KllDoublesSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/KLL/KLLSketch.html]] for more 
information.
+ *
+ * @param child
+ *   child expression containing binary KllDoublesSketch representations to 
merge
+ * @param kExpr
+ *   optional expression for the k parameter from the Apache DataSketches 
library that controls
+ *   the size and accuracy of the sketch. Must be a constant integer between 8 
and 65535.
+ *   If not specified, the merged sketch adopts the k value from the first 
input sketch.
+ *   If specified, the value is used to initialize the aggregation buffer. The 
merge operation
+ *   can handle input sketches with different k values. Larger k values 
provide more accurate
+ *   estimates but result in larger, slower sketches.
+ * @param mutableAggBufferOffset
+ *   offset for mutable aggregation buffer
+ * @param inputAggBufferOffset
+ *   offset for input aggregation buffer
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, k]) - Merges binary KllDoublesSketch representations and 
returns the merged sketch.
+      The input expression should contain binary sketch representations (e.g., 
from kll_sketch_agg_double).
+      The optional k parameter controls the size and accuracy of the merged 
sketch (range 8-65535).
+      If k is not specified, the merged sketch adopts the k value from the 
first input sketch.
+  """,
+  examples = """
+    Examples:
+      > SELECT kll_sketch_get_n_double(_FUNC_(sketch)) FROM (SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(1.0 AS DOUBLE)), 
(CAST(2.0 AS DOUBLE)), (CAST(3.0 AS DOUBLE)) tab(col) UNION ALL SELECT 
kll_sketch_agg_double(col) as sketch FROM VALUES (CAST(4.0 AS DOUBLE)), 
(CAST(5.0 AS DOUBLE)), (CAST(6.0 AS DOUBLE)) tab(col)) t;
+       6
+  """,
+  group = "agg_funcs",
+  since = "4.1.0")
+// scalastyle:on line.size.limit
+case class KllMergeAggDouble(
+    child: Expression,
+    kExpr: Option[Expression] = None,
+    override val mutableAggBufferOffset: Int = 0,
+    override val inputAggBufferOffset: Int = 0)
+    extends KllMergeAggBase[KllDoublesSketch] {
+  def this(child: Expression) = this(child, None, 0, 0)
+  def this(child: Expression, kExpr: Expression) = this(child, Some(kExpr), 0, 
0)
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): KllMergeAggDouble =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+  override def withNewInputAggBufferOffset(
+      newInputAggBufferOffset: Int): KllMergeAggDouble =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): KllMergeAggDouble = {
+    if (newChildren.length == 1) {
+      copy(child = newChildren(0), kExpr = None)
+    } else {
+      copy(child = newChildren(0), kExpr = Some(newChildren(1)))
+    }
+  }
+
+  override def prettyName: String = "kll_merge_agg_double"
+
+  // Factory method implementations
+  protected def newHeapInstance(k: Int): KllDoublesSketch = 
KllDoublesSketch.newHeapInstance(k)
+  protected def wrapSketch(bytes: Array[Byte]): KllDoublesSketch =
+    KllDoublesSketch.wrap(Memory.wrap(bytes))
+  protected def heapifySketch(bytes: Array[Byte]): KllDoublesSketch =
+    KllDoublesSketch.heapify(Memory.wrap(bytes))
+  protected def toByteArray(sketch: KllDoublesSketch): Array[Byte] = 
sketch.toByteArray
+}
+
+/**
+ * Base abstract class for KLL merge aggregate functions that provides common 
implementation
+ * for merging serialized KLL sketches with optional k parameter.
+ *
+ * @tparam T The KLL sketch type (KllLongsSketch, KllFloatsSketch, or 
KllDoublesSketch)
+ */
+abstract class KllMergeAggBase[T <: KllSketch]
+    extends TypedImperativeAggregate[Option[T]]
+    with KllSketchAggBase
+    with ExpectsInputTypes {
+
+  def child: Expression
+
+  // Abstract factory methods for sketch-specific instantiation
+  protected def newHeapInstance(k: Int): T
+  protected def wrapSketch(bytes: Array[Byte]): T
+  protected def heapifySketch(bytes: Array[Byte]): T
+  protected def toByteArray(sketch: T): Array[Byte]
+
+  // Common implementations for all merge aggregates
+  override def children: Seq[Expression] = child +: kExpr.toSeq
+
+  override def dataType: DataType = BinaryType
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    val baseTypes = Seq(BinaryType)
+    if (kExpr.isDefined) baseTypes :+ IntegerType else baseTypes
+  }
+
+  override def nullable: Boolean = false
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck

Review Comment:
   Let's avoid `return` in Scala and just use functional programming instead, 
this can help avoid weird situations with lambdas in other contexts (here and 
elsewhere).



##########
docs/sql-ref-sketch-aggregates.md:
##########
@@ -480,6 +481,60 @@ FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
 
 ---
 
+### kll_merge_agg_*
+
+Aggregates multiple KLL sketches of the same type by merging them together. 
This is useful for combining sketches created in separate aggregations (e.g., 
from different partitions or time windows).

Review Comment:
   ```suggestion
   Aggregates multiple KLL sketches of the same type by merging them together. 
This is useful for combining sketches created in separate aggregations (e.g., 
from different partitions or time windows). These are aggregate functions.
   ```



##########
docs/sql-ref-sketch-aggregates.md:
##########
@@ -480,6 +481,60 @@ FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
 
 ---
 
+### kll_merge_agg_*
+
+Aggregates multiple KLL sketches of the same type by merging them together. 
This is useful for combining sketches created in separate aggregations (e.g., 
from different partitions or time windows).

Review Comment:
   Also, let's update L585 below to add "These are scalar functions."



##########
docs/sql-ref-sketch-aggregates.md:
##########
@@ -480,6 +481,60 @@ FROM VALUES (1), (2), (3), (4), (5), (6), (7) tab(col);
 
 ---
 
+### kll_merge_agg_*
+
+Aggregates multiple KLL sketches of the same type by merging them together. 
This is useful for combining sketches created in separate aggregations (e.g., 
from different partitions or time windows).
+
+**Syntax:**
+```sql
+kll_merge_agg_bigint(sketch [, k])
+kll_merge_agg_float(sketch [, k])
+kll_merge_agg_double(sketch [, k])
+```
+
+| Argument | Type | Description |
+|----------|------|-------------|
+| `sketch` | BINARY | A KLL sketch in binary format (e.g., from 
`kll_sketch_agg_*`) |
+| `k` | INT (optional) | Controls accuracy and size of the merged sketch. 
Range: 8-65535. If not specified, the merged sketch adopts the k value from the 
first input sketch. |
+
+Returns a BINARY containing the merged KLL sketch.
+
+**Examples:**
+```sql
+-- Merge sketches from different partitions
+SELECT kll_sketch_get_quantile_bigint(
+  kll_merge_agg_bigint(sketch),
+  0.5
+)
+FROM (
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (1), (2), (3) tab(col)
+  UNION ALL
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (4), (5), (6) tab(col)
+);
+-- Result: 3
+
+-- Get the total count from merged sketches
+SELECT kll_sketch_get_n_bigint(kll_merge_agg_bigint(sketch))
+FROM (
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (1), (2), (3) tab(col)
+  UNION ALL
+  SELECT kll_sketch_agg_bigint(col) as sketch
+  FROM VALUES (4), (5), (6) tab(col)
+);
+-- Result: 6
+```
+
+**Notes:**
+- When `k` is not specified, the merged sketch adopts the k value from the 
first input sketch.
+- The merge operation can handle input sketches with different k values.
+- NULL values are ignored during aggregation.
+- Use this function when you need to merge multiple sketches in an aggregation 
context. For merging exactly two sketches, use the scalar `kll_sketch_merge_*` 
function instead.

Review Comment:
   Maybe also add a similar note to the docs for `kll_sketch_merge_*` to point 
to here, since these aggregate functions are probably what the user really 
wants anyway.



##########
sql/core/src/test/resources/sql-tests/inputs/kllquantiles.sql:
##########
@@ -160,6 +160,105 @@ SELECT
   )[1] AS result
 FROM t_byte_1_5_through_7_11;
 
+-- Tests for KllMergeAgg* aggregate functions

Review Comment:
   Other test ideas:
   
   * GROUP BY with the aggregate functions :) possibly with a HAVING clause
   * Empty aggregation: test for zero rows input: SELECT 
kll_merge_agg_bigint(sketch) FROM empty_table
   * Wrong sketch type: test merging wrong sketch type (e.g., bigint sketch 
binary passed to kll_merge_agg_float)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to