Hi @Fabian, @Gabor, and @Aljoscha, Thank you for your help! It works as expected.
Best regards, Ivan. On Tue, 24 Jan 2017 at 17:04 Fabian Hueske <fhue...@gmail.com> wrote: > Aljoscha, you are right. > The second mapPartition() needs to have parallelism(1), but the > sortPartition() as well: > > > dataset // assuming some partitioning that can be reused to avoid a shuffle > .sortPartition(1, Order.DESCENDING) > .mapPartition(new ReturnFirstTen()) > .sortPartition(1, Order.DESCENDING).parallelism(1) > .mapPartition(new ReturnFirstTen()).parallelism(1) > > Anyway, as Gabor pointed out, this solution is very in efficient. > > 2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > @Fabian, I think there's a typo in your code, shouldn't it be > > dataset // assuming some partitioning that can be reused to avoid a shuffle > .sortPartition(1, Order.DESCENDING) > .mapPartition(new ReturnFirstTen()) > .sortPartition(1, Order.DESCENDING) > .mapPartition(new ReturnFirstTen()).parallelism(1) > > i.e. the second MapPartition has to be parallelism=1 > > > On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <fhue...@gmail.com> wrote: > > You are of course right Gabor. > @Ivan, you can use a heap in the MapPartitionFunction to collect the top > 10 elements (note that you need to create deep-copies if object reuse is > enabled [1]). > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions > > > 2017-01-24 11:49 GMT+01:00 Gábor Gévay <gga...@gmail.com>: > > Hello, > > Btw. there is a Jira about this: > https://issues.apache.org/jira/browse/FLINK-2549 > Note that the discussion there suggests a more efficient approach, > which doesn't involve sorting the entire partitions. > > And if I remember correctly, this question comes up from time to time > on the mailing list. > > Best, > Gábor > > > > 2017-01-24 11:35 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > Hi Ivan, > > > > I think you can use MapPartition for that. > > So basically: > > > > dataset // assuming some partitioning that can be reused to avoid a > shuffle > > .sortPartition(1, Order.DESCENDING) > > .mapPartition(new ReturnFirstTen()) > > .sortPartition(1, Order.DESCENDING).parallelism(1) > > .mapPartition(new ReturnFirstTen()) > > > > Best, Fabian > > > > > > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <ivan.mushke...@gmail.com>: > >> > >> Hi, > >> > >> I have a dataset of tuples with two fields ids and ratings and I need to > >> find 10 elements with the highest rating in this dataset. I found a > >> solution, but I think it's suboptimal and I think there should be a > better > >> way to do it. > >> > >> The best thing that I came up with is to partition dataset by rating, > sort > >> locally and write the partitioned dataset to disk: > >> > >> dataset > >> .partitionCustom(new Partitioner<Double>() { > >> @Override > >> public int partition(Double key, int numPartitions) { > >> return key.intValue() % numPartitions; > >> } > >> }, 1) . // partition by rating > >> .setParallelism(5) > >> .sortPartition(1, Order.DESCENDING) // locally sort by rating > >> .writeAsText("..."); // write the partitioned dataset to disk > >> > >> This will store tuples in sorted files with names 5, 4, 3, ... that > >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read > sorted > >> data from disk and and N elements with the highest rating. > >> Is there a way to do the same but without writing a partitioned dataset > to > >> a disk? > >> > >> I tried to use "first(10)" but it seems to give top 10 items from a > random > >> partition. Is there a way to get top N elements from every partition? > Then I > >> could locally sort top values from every partition and find top 10 > global > >> values. > >> > >> Best regards, > >> Ivan. > >> > >> > > > > > >