i assumed topk of frequencies in one pass. if its topk by known
sorting/ordering then use priority queue aggregator instead of spacesaver.

On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote:
>
>> Hi Austin,
>>
>> It's trivial to implement top-k in the RDD world - however I would like
>> to stay in the Dataset API world instead of flip-flopping between the two
>> APIs (consistency, wholestage codegen etc).
>>
>> The twitter library appears to support only RDD, and the solution you
>> gave me is very similar to what I did - it doesn't work very well with
>> skewed dataset :) (it has to perform the sort to work out the row number).
>>
>> I've been toying with the UDAF idea, but the more I write the code the
>> more I see myself digging deeper into the developer API land  - not very
>> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
>> gets messy really fast.
>>
>> -------
>> Regards,
>> Andy
>>
>> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:
>>
>>> Andy,
>>>
>>>
>>>
>>> You might want to also checkout the Algebird libraries from Twitter.
>>> They have topK and a lot of other helpful functions. I’ve used the Algebird
>>> topk successfully on very large data sets.
>>>
>>>
>>>
>>> You can also use Spark SQL to do a “poor man’s” topK. This depends on
>>> how scrupulous you are about your TopKs (I can expound on this, if needed).
>>>
>>>
>>>
>>> I obfuscated the field names, before pasting this into email – I think I
>>> got them all consistently.
>>>
>>>
>>>
>>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>        identifier1,
>>>
>>>        identifier2,
>>>
>>>        incomingCount
>>>
>>>   FROM (select time_bucket,
>>>
>>>         identifier1,
>>>
>>>         identifier2,
>>>
>>>         incomingCount,
>>>
>>>        ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>>
>>>                                        identifier1
>>>
>>>                               ORDER BY count DESC) as rowNum
>>>
>>>                                   FROM tablename) tmp
>>>
>>>   WHERE rowNum <=4
>>>
>>>   ORDER BY time_bucket, identifier1, rowNum
>>>
>>>
>>>
>>> The count and order by:
>>>
>>>
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>        identifier1,
>>>
>>>        identifier2,
>>>
>>>        count(identifier2) as myCount
>>>
>>>   FROM table
>>>
>>>   GROUP BY time_bucket,
>>>
>>>            identifier1,
>>>
>>>            identifier2
>>>
>>>   ORDER BY time_bucket,
>>>
>>>            identifier1,
>>>
>>>            count(identifier2) DESC
>>>
>>>
>>>
>>>
>>>
>>> *From: *Andy Dang <nam...@gmail.com>
>>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>>> *To: *user <user@spark.apache.org>
>>> *Subject: *top-k function for Window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> What's the best way to do top-k with Windowing in Dataset world?
>>>
>>>
>>>
>>> I have a snippet of code that filters the data to the top-k, but with
>>> skewed keys:
>>>
>>>
>>>
>>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>>
>>> val rank = row_number().over(windowSpec)
>>>
>>>
>>>
>>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>>
>>>
>>>
>>> The problem with this code is that Spark doesn't know that it can sort
>>> the data locally, get the local rank first. What it ends up doing is
>>> performing a sort by key using the skewed keys, and this blew up the
>>> cluster since the keys are heavily skewed.
>>>
>>>
>>>
>>> In the RDD world we can do something like:
>>>
>>> rdd.mapPartitioins(iterator -> topK(iterator))
>>>
>>> but I can't really think of an obvious to do this in the Dataset API,
>>> especially with Window function. I guess some UserAggregateFunction would
>>> do, but I wonder if there's obvious way that I missed.
>>>
>>>
>>>
>>> -------
>>> Regards,
>>> Andy
>>>
>>
>>
>

Reply via email to