tl;dr: there's no easy way to implement aggregate expressions that'd require 
multiple pass over data. It is simply not something that's supported and doing 
so would be very high cost.

Would you be OK using approximate percentile? That's relatively cheap.

On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas < nicholas.cham...@gmail.com 
> wrote:

> 
> No takers here? :)
> 
> 
> I can see now why a median function is not available in most data
> processing systems. It's pretty annoying to implement!
> 
> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas < nicholas. chammas@ gmail.
> com ( nicholas.cham...@gmail.com ) > wrote:
> 
> 
>> I'm trying to create a new aggregate function. It's my first time working
>> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>> 
>> 
>> My goal is to create a function to calculate the median (
>> https://issues.apache.org/jira/browse/SPARK-26589 ).
>> 
>> 
>> As a very simple solution, I could just define median to be an alias of ` 
>> Percentile(col,
>> 0.5)`. However, the leading comment on the Percentile expression (
>> https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39
>> ) highlights that it's very memory-intensive and can easily lead to
>> OutOfMemory errors.
>> 
>> 
>> So instead of using Percentile, I'm trying to create an Expression that
>> calculates the median without needing to hold everything in memory at
>> once. I'm considering two different approaches:
>> 
>> 
>> 1. Define Median as a combination of existing expressions: The median can
>> perhaps be built out of the existing expressions for Count (
>> https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48
>> ) and NthValue (
>> https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675
>> ).
>> 
>> 
>> 
>>> I don't see a template I can follow for building a new expression out of
>>> existing expressions (i.e. without having to implement a bunch of methods
>>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>>> would wrap NthValue to make it usable as a regular aggregate function. The
>>> wrapped NthValue would need an implicit window that provides the necessary
>>> ordering.
>>> 
>> 
>> 
>> 
>> 
>>> Is there any potential to this idea? Any pointers on how to implement it?
>>> 
>> 
>> 
>> 
>> 2. Another memory-light approach to calculating the median requires
>> multiple passes over the data to converge on the answer. The approach is 
>> described
>> here (
>> https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers
>> ). (I posted a sketch implementation of this approach using Spark's
>> user-level API here (
>> https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081
>> ).)
>> 
>> 
>> 
>>> I am also struggling to understand how I would build an aggregate function
>>> like this, since it requires multiple passes over the data. From what I
>>> can see, Catalyst's aggregate functions are designed to work with a single
>>> pass over the data.
>>> 
>>> 
>>> We don't seem to have an interface for AggregateFunction that supports
>>> multiple passes over the data. Is there some way to do this?
>>> 
>> 
>> 
>> Again, this is my first serious foray into Catalyst. Any specific
>> implementation guidance is appreciated!
>> 
>> 
>> Nick
>> 
> 
>

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to