Thanks Georg. I have looked at UADF based on your suggestion. Looks like you can only pass single column to UADF. Is there any way you can pass entire Row to aggregate function?
I want to list of user defined function and given row object. Perform the aggregation and return aggregated Row object. Regards Sandeep On Fri, Dec 8, 2017 at 12:47 PM Georg Heiler <georg.kf.hei...@gmail.com> wrote: > You are looking for an UADF. > Sandip Mehta <sandip.mehta....@gmail.com> schrieb am Fr. 8. Dez. 2017 um > 06:20: > >> Hi, >> >> I want to group on certain columns and then for every group wants to >> apply custom UDF function to it. Currently groupBy only allows to add >> aggregation function to GroupData. >> >> For this was thinking to use groupByKey which will return KeyValueDataSet >> and then apply UDF for every group but really not been able solve this. >> >> SM >> >> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <weichen...@databricks.com> >> wrote: >> >>> You can groupBy multiple columns on dataframe, so why you need so >>> complicated schema ? >>> >>> suppose df schema: (x, y, u, v, z) >>> >>> df.groupBy($"x", $"y").agg(...) >>> >>> Is this you want ? >>> >>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta < >>> sandip.mehta....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> During my aggregation I end up having following schema. >>>> >>>> Row(Row(val1,val2), Row(val1,val2,val3...)) >>>> >>>> val values = Seq( >>>> (Row(10, 11), Row(10, 2, 11)), >>>> (Row(10, 11), Row(10, 2, 11)), >>>> (Row(20, 11), Row(10, 2, 11)) >>>> ) >>>> >>>> >>>> 1st tuple is used to group the relevant records for aggregation. I have >>>> used following to create dataset. >>>> >>>> val s = StructType(Seq( >>>> StructField("x", IntegerType, true), >>>> StructField("y", IntegerType, true) >>>> )) >>>> val s1 = StructType(Seq( >>>> StructField("u", IntegerType, true), >>>> StructField("v", IntegerType, true), >>>> StructField("z", IntegerType, true) >>>> )) >>>> >>>> val ds = >>>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s), >>>> RowEncoder(s1))) >>>> >>>> Is this correct way of representing this? >>>> >>>> How do I create dataset and row encoder for such use case for doing >>>> groupByKey on this? >>>> >>>> >>>> >>>> Regards >>>> Sandeep >>>> >>> >>>