great, by adding a little implicit wrapper i can use algebird's MonoidAggregator, which gives me the equivalent of GroupedDataset.mapValues (by using Aggregator.composePrepare)
i am a little surprised you require a monoid and not just a semiring. but probably the right choice given possibly empty datasets. i do seem the be running into SPARK-12696 <https://issues.apache.org/jira/browse/SPARK-12696> for some aggregators so will wait for spark 1.6.1 also i am having no luck using the aggregators with DataFrame instead of Dataset. for example: lazy val ds1 = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3))).toDS ds1.toDF.groupBy($"_1").agg(Aggregator.toList[(String, Int)]).show gives me: [info] org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [_1#50], [_1#50,(AggregatorAdapter(),mode=Complete,isDistinct=false) AS AggregatorAdapter()#61]; [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) [info] at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:203) [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105) [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) [info] at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44) [info] at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) [info] at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) [info] at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) my motivation for trying the DataFrame version is that it takes in unlimited aggregators with: GroupedData.def agg(expr: Column, exprs: Column*): DataFrame there is no equivalent in GroupedDataset. On Sun, Feb 14, 2016 at 12:31 AM, Michael Armbrust <mich...@databricks.com> wrote: > Instead of grouping with a lambda function, you can do it with a column > expression to avoid materializing an unnecessary tuple: > > df.groupBy($"_1") > > Regarding the mapValues, you can do something similar using an Aggregator > <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>, > but I agree that we should consider something lighter weight like the > mapValues you propose. > > On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> i have a Dataset[(K, V)] >> i would like to group by k and then reduce V using a function (V, V) => V >> how do i do this? >> >> i would expect something like: >> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f) >> or better: >> ds.grouped.reduce(f) # grouped only works on Dataset[(_, _)] and i dont >> care about java api >> >> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a >> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key >> through the reduce operation but that seems ugly and inefficient. >> >> any thoughts? >> >> >> >