Thanks Georg. I have looked at UADF based on your suggestion. Looks like
you can only pass single column to UADF. Is there any way you can pass
entire Row to aggregate function?

I want to list of user defined function and given row object. Perform the
aggregation and return aggregated Row object.

Regards
Sandeep

On Fri, Dec 8, 2017 at 12:47 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> You are looking for an UADF.
> Sandip Mehta <sandip.mehta....@gmail.com> schrieb am Fr. 8. Dez. 2017 um
> 06:20:
>
>> Hi,
>>
>> I want to group on certain columns and then for every group wants to
>> apply custom UDF function to it. Currently groupBy only allows to add
>> aggregation function to GroupData.
>>
>> For this was thinking to use groupByKey which will return KeyValueDataSet
>> and then apply UDF for every group but really not been able solve this.
>>
>> SM
>>
>> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <weichen...@databricks.com>
>> wrote:
>>
>>> You can groupBy multiple columns on dataframe, so why you need so
>>> complicated schema ?
>>>
>>> suppose df schema: (x, y, u, v, z)
>>>
>>> df.groupBy($"x", $"y").agg(...)
>>>
>>> Is this you want ?
>>>
>>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <
>>> sandip.mehta....@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> During my aggregation I end up having following schema.
>>>>
>>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>>
>>>> val values = Seq(
>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>     (Row(20, 11), Row(10, 2, 11))
>>>>   )
>>>>
>>>>
>>>> 1st tuple is used to group the relevant records for aggregation. I have
>>>> used following to create dataset.
>>>>
>>>> val s = StructType(Seq(
>>>>   StructField("x", IntegerType, true),
>>>>   StructField("y", IntegerType, true)
>>>> ))
>>>> val s1 = StructType(Seq(
>>>>   StructField("u", IntegerType, true),
>>>>   StructField("v", IntegerType, true),
>>>>   StructField("z", IntegerType, true)
>>>> ))
>>>>
>>>> val ds = 
>>>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>>>>  RowEncoder(s1)))
>>>>
>>>> Is this correct way of representing this?
>>>>
>>>> How do I create dataset and row encoder for such use case for doing
>>>> groupByKey on this?
>>>>
>>>>
>>>>
>>>> Regards
>>>> Sandeep
>>>>
>>>
>>>

Reply via email to