For the curious, I played around with a UDAF for this (shown below). On the
downside, it assembles a Map of all possible values of the column that'll
need to be stored in memory somewhere.
I suspect some kind of sorted groupByKey + cogroup could stream values
through, though might not support partial aggregation, then. Will try that
next.
/**
* [[UserDefinedAggregateFunction]] for computing the mode of a string
column.
*
* WARNING: This will assemble a Map of all possible values in memory.
*
* It'll ignore null values and return null if all values are null.
*/
class ModeAggregateFunction extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("value",
StringType) :: Nil)
override def bufferSchema: StructType = StructType(
StructField("values", MapType(StringType, LongType, valueContainsNull =
false)) :: Nil)
override def dataType: DataType = StringType
override def deterministic: Boolean = true
// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map[String, Long]()
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
if (input == null || input.getString(0) == null) {
return
}
val value = input.getString(0)
val frequencies = buffer.getAs[Map[String, Long]](0)
val count = frequencies.getOrElse(value, 0L)
buffer(0) = frequencies + (value -> (count + 1L))
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
val frequencies1: Map[String, Long] = buffer1.getAs[Map[String,
Long]](0)
val frequencies2: Map[String, Long] = buffer2.getAs[Map[String,
Long]](0)
buffer1(0) = frequencies1 ++ frequencies2.map({
case (k: String,v: Long) => k -> (v.asInstanceOf[Long] +
frequencies1.getOrElse(k, 0L))
})
}
override def evaluate(buffer: Row): Any = {
val frequencies = buffer.getAs[Map[String, Long]](0)
if (frequencies.isEmpty) {
return null
}
frequencies.maxBy(_._2)._1
}
}
On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <[email protected]> wrote:
> Hi,
>
> One common situation I run across is that I want to compact my data and
> select the mode (most frequent value) in several columns for each group.
>
> Even calculating mode for one column in SQL is a bit tricky. The ways I've
> seen usually involve a nested sub-select with a group by + count and then a
> window function using rank().
>
> However, what if you want to calculate the mode for several columns,
> producing a new row with the results? And let's say the set of columns is
> only known at runtime.
>
> In Spark SQL, I start going down a road of many self-joins. The more
> efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
> a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
> each group.
>
> What's the best way?
>
> Here's a contrived example:
>
> val input = spark.sparkContext.parallelize(Seq(
> ("catosaur", "black", "claws"),
> ("catosaur", "orange", "scales"),
> ("catosaur", "black", "scales"),
> ("catosaur", "orange", "scales"),
> ("catosaur", "black", "spikes"),
> ("bearcopter", "gray", "claws"),
> ("bearcopter", "black", "fur"),
> ("bearcopter", "gray", "flight"),
> ("bearcopter", "gray", "flight")))
> .toDF("creature", "color", "feature")
>
> +----------+------+-------+
> |creature |color |feature|
> +----------+------+-------+
> |catosaur |black |claws |
> |catosaur |orange|scales |
> |catosaur |black |scales |
> |catosaur |orange|scales |
> |catosaur |black |spikes |
> |bearcopter|gray |claws |
> |bearcopter|black |fur |
> |bearcopter|gray |flight |
> |bearcopter|gray |flight |
> +----------+------+-------+
>
> val expectedOutput = spark.sparkContext.parallelize(Seq(
> ("catosaur", "black", "scales"),
> ("bearcopter", "gray", "flight")))
> .toDF("creature", "color", "feature")
>
> +----------+-----+-------+
> |creature |color|feature|
> +----------+-----+-------+
> |catosaur |black|scales |
> |bearcopter|gray |flight |
> +----------+-----+-------+
>
>
>
>
>
>