Thanks for all the suggestions.

So after toying with a bunch of examples I ended up taking the following
approach:
- Convert dataset to RDD
- Key by the columns I wanted
- Use aggregateByKey() to aggregate data into a serializable structure
backed by a bounded priority queue
- flatMap the result to get back an RDD<Row>
- Convert the RDD to Dataset<> since it's got the same schema

I tried the UDAF approach but it appears that in order to use ArrayType, I
have to store the rows in InternalRow format. It's not obvious to me how to
convert this InternalRow format back to the original Row, which is required
by evaluate() method (so I can explode the data to original schema). I
actually got it working for flat schema, but scraped it in the end since
it's not really what I wanted.

Not really a big fan of the RDD approach but if anyone's got the UDAF
approach working then please let me know :)

-------
Regards,
Andy

On Wed, Jan 4, 2017 at 5:29 PM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> What about https://github.com/myui/hivemall/wiki/Efficient-Top-k-
> computation-on-Apache-Hive-using-Hivemall-UDTF
>
> Koert Kuipers <ko...@tresata.com> schrieb am Mi. 4. Jan. 2017 um 16:11:
>
>> i assumed topk of frequencies in one pass. if its topk by known
>> sorting/ordering then use priority queue aggregator instead of spacesaver.
>>
>> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>> i dont know anything about windowing or about not using developer apis...
>>
>> but
>>
>> but a trivial implementation of top-k requires a total sort per group.
>> this can be done with dataset. we do this using spark-sorted (
>> https://github.com/tresata/spark-sorted) but its not hard to do it
>> yourself for datasets either. for rdds its actually a little harder i think
>> (if you want to avoid in memory assumption, which i assume you do)..
>>
>> a perhaps more efficient implementation uses an aggregator. it is not
>> hard to adapt algebirds topk aggregator (spacesaver) to use as a spark
>> aggregator. this requires a simple adapter class. we do this in-house as
>> well. although i have to say i would recommend spark 2.1.0 for this. spark
>> 2.0.x aggregator codegen is too buggy in my experience.
>>
>> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote:
>>
>> Hi Austin,
>>
>> It's trivial to implement top-k in the RDD world - however I would like
>> to stay in the Dataset API world instead of flip-flopping between the two
>> APIs (consistency, wholestage codegen etc).
>>
>> The twitter library appears to support only RDD, and the solution you
>> gave me is very similar to what I did - it doesn't work very well with
>> skewed dataset :) (it has to perform the sort to work out the row number).
>>
>> I've been toying with the UDAF idea, but the more I write the code the
>> more I see myself digging deeper into the developer API land  - not very
>> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
>> gets messy really fast.
>>
>> -------
>> Regards,
>> Andy
>>
>> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:
>>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>        identifier1,
>>
>>        identifier2,
>>
>>        incomingCount
>>
>>   FROM (select time_bucket,
>>
>>         identifier1,
>>
>>         identifier2,
>>
>>         incomingCount,
>>
>>        ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>                                        identifier1
>>
>>                               ORDER BY count DESC) as rowNum
>>
>>                                   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>        identifier1,
>>
>>        identifier2,
>>
>>        count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>            identifier1,
>>
>>            identifier2
>>
>>   ORDER BY time_bucket,
>>
>>            identifier1,
>>
>>            count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang <nam...@gmail.com>
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user <user@spark.apache.org>
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>
>> val rank = row_number().over(windowSpec)
>>
>>
>>
>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>
>>
>>
>> The problem with this code is that Spark doesn't know that it can sort
>> the data locally, get the local rank first. What it ends up doing is
>> performing a sort by key using the skewed keys, and this blew up the
>> cluster since the keys are heavily skewed.
>>
>>
>>
>> In the RDD world we can do something like:
>>
>> rdd.mapPartitioins(iterator -> topK(iterator))
>>
>> but I can't really think of an obvious to do this in the Dataset API,
>> especially with Window function. I guess some UserAggregateFunction would
>> do, but I wonder if there's obvious way that I missed.
>>
>>
>>
>> -------
>> Regards,
>> Andy
>>
>>
>>
>>
>>

Reply via email to