Hello Hivemind, Referring to this thread - https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html. I have learnt that we can not do much with groupped data apart from using existing aggregate functions. This blog post was written in may 2015, I don't know if things are changes from that point of time. I am using 1.4 version of spark.
What I am trying to achieve is something very similar to collectset in hive (actually unique ordered concated values.) e.g. 1,2 1,3 2,4 2,5 2,4 to 1, "2,3" 2, "4,5" Currently I am achieving this by converting dataframe to RDD, do the required operations and convert it back to dataframe as shown below. public class AvailableSizes implements Serializable { public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) { final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD(); JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair( (PairFunction<Row, String, Row>) row -> { final Object[] objects = {row.getAs(0), row.getAs(1), row.getAs(3)}; return new Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new GenericRowWithSchema(objects, SalesColumns.getOutputSchema())); }); JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new Function2<Row, Row, Row>() { @Override public Row call(Row aRow, Row bRow) { final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, bRow); final Object[] objects = {aRow.getAs(0), aRow.getAs(1), uniqueCommaSeparatedSizes}; return new GenericRowWithSchema(objects, SalesColumns.getOutputSchema()); } private String uniqueSizes(Row aRow, Row bRow) { final SortedSet<String> allSizes = new TreeSet<>(); final List<String> aSizes = Arrays.asList(((String) aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); final List<String> bSizes = Arrays.asList(((String) bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(",")); allSizes.addAll(aSizes); allSizes.addAll(bSizes); return csvFormat(allSizes); } }); final JavaRDD<Row> values = withSizeList.values(); return ssc.createDataFrame(values, SalesColumns.getOutputSchema()); } public String csvFormat(Collection<String> collection) { return collection.stream().map(Object::toString).collect(Collectors.joining(",")); } } Please suggest if there is a better way of doing this. Regards, Abhishek