Hi all,

I run some tests for stream aggregation on rows. The data stream is simply
registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime()
rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order
for them can always be preserved.

Thanks,
Xingcan

Reply via email to