Hi Xingcan, Are you using parallelism 1 for the test? procTime semantics deals with the objects as they loaded in the operators. It could be the co-occuring partitioned events (in the same MS time frame) are processed in parallel and then the output is produced in different order.
I suggest you to have a look at the integration test to verify that the configuration of your experiment is correct. Best, Stefano -----Original Message----- From: Xingcan Cui [mailto:xingc...@gmail.com] Sent: Tuesday, April 11, 2017 5:31 AM To: dev@flink.apache.org Subject: Question about the process order in stream aggregate Hi all, I run some tests for stream aggregation on rows. The data stream is simply registered as val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 1), Order(2L, "diaper", 2), Order(3L, "diaper", 3), Order(4L, "rubber", 4))) tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount), and the SQL is defined as select product, sum(amount) over (partition by product order by procTime() rows between unbounded preceding and current row from orderA). My expected output should be 2> Result(beer,1) 2> Result(diaper,2) 1> Result(rubber,4) 2> Result(diaper,5). However, sometimes I get the following output 2> Result(beer,1) 2> Result(diaper,3) 1> Result(rubber,4) 2> Result(diaper,5). It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)" are out of order. Is that normal? BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can always be preserved. Thanks, Xingcan