Hello Hivemind,

Referring to this thread -
https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
I have learnt that we can not do much with groupped data apart from using
existing aggregate functions. This blog post was written in may 2015, I
don't know if things are changes from that point of time. I am using 1.4
version of spark.

What I am trying to achieve is something very similar to collectset in hive
(actually unique ordered concated values.) e.g.

1,2
1,3
2,4
2,5
2,4

to
1, "2,3"
2, "4,5"

Currently I am achieving this by converting dataframe to RDD, do the
required operations and convert it back to dataframe as shown below.

public class AvailableSizes implements Serializable {

    public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
        final JavaRDD<Row> rowJavaRDD = salesDataFrame.toJavaRDD();

        JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
                (PairFunction<Row, String, Row>) row -> {
                    final Object[] objects = {row.getAs(0),
row.getAs(1), row.getAs(3)};
                    return new
Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new
GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
                });

        JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new
Function2<Row, Row, Row>() {
            @Override
            public Row call(Row aRow, Row bRow) {
                final String uniqueCommaSeparatedSizes =
uniqueSizes(aRow, bRow);
                final Object[] objects = {aRow.getAs(0),
aRow.getAs(1), uniqueCommaSeparatedSizes};
                return new GenericRowWithSchema(objects,
SalesColumns.getOutputSchema());
            }

            private String uniqueSizes(Row aRow, Row bRow) {
                final SortedSet<String> allSizes = new TreeSet<>();
                final List<String> aSizes = Arrays.asList(((String)
aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
                final List<String> bSizes = Arrays.asList(((String)
bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
                allSizes.addAll(aSizes);
                allSizes.addAll(bSizes);
                return csvFormat(allSizes);
            }
        });

        final JavaRDD<Row> values = withSizeList.values();

        return ssc.createDataFrame(values, SalesColumns.getOutputSchema());

    }

    public String csvFormat(Collection<String> collection) {
        return 
collection.stream().map(Object::toString).collect(Collectors.joining(","));
    }
}

Please suggest if there is a better way of doing this.

Regards,
Abhishek

Reply via email to