Hi Robert, there are two issues involved here.
1) Flink does not support totally ordered paralllel output out-of-the box. Fully sorting data in parallel requires range partitioning which requires some knowledge of the data (distribution of the key values) to produce balanced partitions. Flink does not feature statistics collection to determine the key distribution that's why it does not offer a range partition operation yet. However, Flink supports to sort local partitions and custom partitioners. If you know the value distribution of the key, you can implement a custom partitioner and locally sort the partitions to obtain a fully sorted result. 2) print() collects all partitions in arbitrary order such that any order across partitions is destroyed (the order within partitions should not be affected). Best, Fabian 2015-07-15 14:56 GMT+02:00 Robert Schmidtke <ro.schmid...@gmail.com>: > Hey everyone, > > I'm currently trying to implement TPC-H Q1 and that involves ordering of > results. Now I'm not too familiar with the transformations yet, however for > the life of me I cannot figure out how to get it to work. Consider the > following toy example: > > final ExecutionEnvironment env = ExecutionEnvironment > .getExecutionEnvironment(); > DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements( > new Tuple3<String, Integer, Integer>("a", 2, 1), > new Tuple3<String, Integer, Integer>("b", 1, 2), > new Tuple3<String, Integer, Integer>("a", 1, 3), > new Tuple3<String, Integer, Integer>("b", 1, 4), > new Tuple3<String, Integer, Integer>("a", 1, 5), > new Tuple3<String, Integer, Integer>("b", 2, 6), > new Tuple3<String, Integer, Integer>("a", 2, 7), > new Tuple3<String, Integer, Integer>("b", 2, 8)); > elems.groupBy(0, 1).sum(2).print(); > > I want the output to be: > (a,1,8) > (a,2,8) > (b,1,6) > (b,2,14) > > However the output is: > (a,2,8) > (b,1,6) > (b,2,14) > (a,1,8) > > No matter where I place sorting of partitions or groups transformations > (strange enough I just realized that when I don't add any ordering, the > output is as expected; however this is just the case in the toy example and > not in my TPC-H Q1). Is it currently not possible to achieve an ordered > output in this case? Please bear with me if I overlooked the obvious, but I > could not get a clear picture from the documentation. > > Btw. the code is right here: > https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137 > I verified the results with the provided data from TPC-H, apart from the > sorting everything is fine. > > Thanks a bunch in advance, > > Cheers > Robert > > -- > My GPG Key ID: 336E2680 >