i assumed topk of frequencies in one pass. if its topk by known sorting/ordering then use priority queue aggregator instead of spacesaver.
On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote: > i dont know anything about windowing or about not using developer apis... > > but > > but a trivial implementation of top-k requires a total sort per group. > this can be done with dataset. we do this using spark-sorted ( > https://github.com/tresata/spark-sorted) but its not hard to do it > yourself for datasets either. for rdds its actually a little harder i think > (if you want to avoid in memory assumption, which i assume you do).. > > a perhaps more efficient implementation uses an aggregator. it is not hard > to adapt algebirds topk aggregator (spacesaver) to use as a spark > aggregator. this requires a simple adapter class. we do this in-house as > well. although i have to say i would recommend spark 2.1.0 for this. spark > 2.0.x aggregator codegen is too buggy in my experience. > > On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote: > >> Hi Austin, >> >> It's trivial to implement top-k in the RDD world - however I would like >> to stay in the Dataset API world instead of flip-flopping between the two >> APIs (consistency, wholestage codegen etc). >> >> The twitter library appears to support only RDD, and the solution you >> gave me is very similar to what I did - it doesn't work very well with >> skewed dataset :) (it has to perform the sort to work out the row number). >> >> I've been toying with the UDAF idea, but the more I write the code the >> more I see myself digging deeper into the developer API land - not very >> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it >> gets messy really fast. >> >> ------- >> Regards, >> Andy >> >> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: >> >>> Andy, >>> >>> >>> >>> You might want to also checkout the Algebird libraries from Twitter. >>> They have topK and a lot of other helpful functions. I’ve used the Algebird >>> topk successfully on very large data sets. >>> >>> >>> >>> You can also use Spark SQL to do a “poor man’s” topK. This depends on >>> how scrupulous you are about your TopKs (I can expound on this, if needed). >>> >>> >>> >>> I obfuscated the field names, before pasting this into email – I think I >>> got them all consistently. >>> >>> >>> >>> Here’s the meat of the TopK part (found on SO, but I don’t have a >>> reference) – this one takes the top 4, hence “rowNum <= 4”: >>> >>> >>> >>> SELECT time_bucket, >>> >>> identifier1, >>> >>> identifier2, >>> >>> incomingCount >>> >>> FROM (select time_bucket, >>> >>> identifier1, >>> >>> identifier2, >>> >>> incomingCount, >>> >>> ROW_NUMBER() OVER (PARTITION BY time_bucket, >>> >>> identifier1 >>> >>> ORDER BY count DESC) as rowNum >>> >>> FROM tablename) tmp >>> >>> WHERE rowNum <=4 >>> >>> ORDER BY time_bucket, identifier1, rowNum >>> >>> >>> >>> The count and order by: >>> >>> >>> >>> >>> >>> SELECT time_bucket, >>> >>> identifier1, >>> >>> identifier2, >>> >>> count(identifier2) as myCount >>> >>> FROM table >>> >>> GROUP BY time_bucket, >>> >>> identifier1, >>> >>> identifier2 >>> >>> ORDER BY time_bucket, >>> >>> identifier1, >>> >>> count(identifier2) DESC >>> >>> >>> >>> >>> >>> *From: *Andy Dang <nam...@gmail.com> >>> *Date: *Tuesday, January 3, 2017 at 7:06 AM >>> *To: *user <user@spark.apache.org> >>> *Subject: *top-k function for Window >>> >>> >>> >>> Hi all, >>> >>> >>> >>> What's the best way to do top-k with Windowing in Dataset world? >>> >>> >>> >>> I have a snippet of code that filters the data to the top-k, but with >>> skewed keys: >>> >>> >>> >>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >>> >>> val rank = row_number().over(windowSpec) >>> >>> >>> >>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >>> >>> >>> >>> The problem with this code is that Spark doesn't know that it can sort >>> the data locally, get the local rank first. What it ends up doing is >>> performing a sort by key using the skewed keys, and this blew up the >>> cluster since the keys are heavily skewed. >>> >>> >>> >>> In the RDD world we can do something like: >>> >>> rdd.mapPartitioins(iterator -> topK(iterator)) >>> >>> but I can't really think of an obvious to do this in the Dataset API, >>> especially with Window function. I guess some UserAggregateFunction would >>> do, but I wonder if there's obvious way that I missed. >>> >>> >>> >>> ------- >>> Regards, >>> Andy >>> >> >> >