Yes, going to parallelism 1 is another option but you don't have to use a
fake-reduce to enforce sorting.
You can simply do:

DataSet<Tuple3<Integer, String, String>> result = ...
result
  .sortPartition(1, Order.ASCENDING).setParallelism(1) // sort on first
String field
  .output(...);

Fabian

2015-07-15 15:32 GMT+02:00 Matthias J. Sax <mj...@informatik.hu-berlin.de>:

> Hi Robert,
>
> global sorting of the final output is currently no supported by Flink
> out-of-the-box. The reason is, that a global sort requires all data to
> be processed by a single node (what contradicts data parallelism).
>
> For small output, you could use a final "reduce" with no key (ie, all
> data go to a single group) and dop=1 and do the sorting in-memory in an
> own UDF.
>
> Hope this helps.
>
> -Matthias
>
> On 07/15/2015 02:56 PM, Robert Schmidtke wrote:
> > Hey everyone,
> >
> > I'm currently trying to implement TPC-H Q1 and that involves ordering of
> > results. Now I'm not too familiar with the transformations yet, however
> > for the life of me I cannot figure out how to get it to work. Consider
> > the following toy example:
> >
> > final ExecutionEnvironment env = ExecutionEnvironment
> > .getExecutionEnvironment();
> > DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
> > new Tuple3<String, Integer, Integer>("a", 2, 1),
> > new Tuple3<String, Integer, Integer>("b", 1, 2),
> > new Tuple3<String, Integer, Integer>("a", 1, 3),
> > new Tuple3<String, Integer, Integer>("b", 1, 4),
> > new Tuple3<String, Integer, Integer>("a", 1, 5),
> > new Tuple3<String, Integer, Integer>("b", 2, 6),
> > new Tuple3<String, Integer, Integer>("a", 2, 7),
> > new Tuple3<String, Integer, Integer>("b", 2, 8));
> > elems.groupBy(0, 1).sum(2).print();
> >
> > I want the output to be:
> > (a,1,8)
> > (a,2,8)
> > (b,1,6)
> > (b,2,14)
> >
> > However the output is:
> > (a,2,8)
> > (b,1,6)
> > (b,2,14)
> > (a,1,8)
> >
> > No matter where I place sorting of partitions or groups transformations
> > (strange enough I just realized that when I don't add any ordering, the
> > output is as expected; however this is just the case in the toy example
> > and not in my TPC-H Q1). Is it currently not possible to achieve an
> > ordered output in this case? Please bear with me if I overlooked the
> > obvious, but I could not get a clear picture from the documentation.
> >
> > Btw. the code is right
> > here:
> https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137
> > I verified the results with the provided data from TPC-H, apart from the
> > sorting everything is fine.
> >
> > Thanks a bunch in advance,
> >
> > Cheers
> > Robert
> >
> > --
> > My GPG Key ID: 336E2680
>
>

Reply via email to