still not sure how to use this with a DataFrame, assuming i cannot convert
it to a specific Dataset with .as (because i got lots of columns, or
because at compile time these types are simply not known).

i cannot specify the columns these operate on. i can resort to Row
transformations, like this:

scala> val x = List(("a", 1), ("a", 2), ("b", 5)).toDF("k", "v")
scala> x.groupBy("k").agg(typed.sum{ row: Row => row.int(1) }).show

note that it currently fails because of a known bug (SPARK-13363
<https://issues.apache.org/jira/browse/SPARK-13363>), but ignoring that its
somewhat ugly that i have to resort to Row transformations. instead we
should allow something like for any Aggregator (so add an apply method that
takes in Column* to indicate what colums to operate on):

scala> x.groupBy("k").agg(typed.sum(col("v")))

any hints on what i need to do to make this happen? i have been going
through Aggregator, AggregateFunction, AggregateExpression,
TypedAggregateExpression and friends trying to get a sense but haven't had
much luck so far.



On Tue, Apr 12, 2016 at 1:50 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Did you see these?
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/scala/typed.scala#L70
>
> On Tue, Apr 12, 2016 at 9:46 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i dont really see how Aggregator can be useful for DataFrame unless you
>> can specify what columns it works on. Having to code Aggregators to always
>> use Row and then extract the values yourself breaks the abstraction and
>> makes it not much better than UserDefinedAggregateFunction (well... maybe
>> still better because i have encoders so i can use kryo).
>>
>> On Mon, Apr 11, 2016 at 10:53 PM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> saw that, dont think it solves it. i basically want to add some children
>>> to the expression i guess, to indicate what i am operating on? not sure if
>>> even makes sense
>>>
>>> On Mon, Apr 11, 2016 at 8:04 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> I'll note this interface has changed recently:
>>>> https://github.com/apache/spark/commit/520dde48d0d52dbbbbe1710a3275fdd5355dd69d
>>>>
>>>> I'm not sure that solves your problem though...
>>>>
>>>> On Mon, Apr 11, 2016 at 4:45 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> i like the Aggregator a lot
>>>>> (org.apache.spark.sql.expressions.Aggregator), but i find the way to use 
>>>>> it
>>>>> somewhat confusing. I am supposed to simply call aggregator.toColumn, but
>>>>> that doesn't allow me to specify which fields it operates on in a 
>>>>> DataFrame.
>>>>>
>>>>> i would basically like to do something like
>>>>> dataFrame
>>>>>   .groupBy("k")
>>>>>   .agg(
>>>>>     myAggregator.on("v1", "v2").toColumn,
>>>>>     myOtherAggregator.on("v3", "v4").toColumn
>>>>>   )
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to