Hi all,
I run some tests for stream aggregation on rows. The data stream is simply
registered as
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 1),
Order(2L, "diaper", 2),
Order(3L, "diaper", 3),
Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
and the SQL is defined as
select product, sum(amount) over (partition by product order by procTime()
rows between unbounded preceding and current row from orderA).
My expected output should be
2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).
However, sometimes I get the following output
2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).
It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?
BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order
for them can always be preserved.
Thanks,
Xingcan