Hi, @Radu @Stefano, sorry that I misunderstood it before. We considered the problem from different viewpoints. I agree that (ingestion) timestamp injection could be a good solution for this problem in some scenarios. Thanks.
@Fabian, thanks for your explanation. That makes sense. Best, Xingcan On Thu, Apr 13, 2017 at 2:41 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Xingcan, > > the 0L timestamp literal is an artifact of how the Calcite query is > translated by Flink. > It represents the value of the procTime() function that is logically used > to sort the data. Calcite expects this attribute in the schema but Flink's > OVER operator actually processes the data based on the local wallclock time > of the operator. > > So this is an unnecessary overhead at the moment, which hopefully will be > resolved before the 1.3 release. > > Best, Fabian > > 2017-04-12 9:45 GMT+02:00 Stefano Bortoli <stefano.bort...@huawei.com>: > > > I'm afraid that to keep order either you have to process it in a serial > > way (parallelism 1), or provide an element that allows to sort the > objects > > when these are processed in parallel (i.e. rowTime). When you distribute > > the computation, as Fabian explained, you get a round-robin assignment to > > the different process functions, which may not respect the original input > > order in the output. > > > > ProcessTime means that you don't care much about time as a sorting > > reference for the computation of the result. > > > > What Radu suggested is to inject the timestamp in your dataStream before > > processing, and then use rowTime semantics. It won't be "real row time" > > because your function will inject the timestamp of "arrival", but it will > > produce sorted output as you "order by rowTime". Hope it helps. > > > > Best, > > Stefano > > > > -----Original Message----- > > From: Xingcan Cui [mailto:xingc...@gmail.com] > > Sent: Wednesday, April 12, 2017 8:11 AM > > To: dev@flink.apache.org > > Subject: Re: Question about the process order in stream aggregate > > > > Hi everybody, > > > > thank you all for your help. > > > > @Fabian I also check the DataStream that translated from the query and > try > > to figure out what happens in each step. The results are as follows > > (correct me please if there's something wrong): > > > > Source -> Map (Order to Row3) -> FlatMap (do project and extract > > timestamp?) -> Partition (partition by product) ->BoundedOverAggregate > > (aggregate) -> FlatMap (Row5 to Row2) -> Sink > > > > @Stefano. It's indeed unable to keep the order unless we set parallelism > > of the first MapFunc to 1 (which is unpractical) or execute the partition > > step in advance (seems to be unpractical too). > > > > Anyway, the procTime itself is actually a "blurred concept" that full of > > uncertainty, right? Now I think it's better to use rowTime instead if the > > application need order preserving. > > > > @Radu, the assignTimestampsAndWatermarks method seems to be useless, > maybe > > it only affects the rowTime? > > > > There's another question. I find the following code in the generated > > FlatMap function (step 3 project and extract timestamp): > > > > ... > > java.sql.Timestamp result$16; > > if (false) { > > result$16 = null; > > } > > else { > > result$16 = > > org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L); > > } > > > > if (false) { > > out.setField(2, null); > > } > > else { > > out.setField(2, result$16); > > } > > ... > > > > Could you please help me explain what's the 0L timestamp mean? > > > > Best, > > Xingcan > > > > On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <radu.tudo...@huawei.com> > > wrote: > > > > > Hi Xingcan, > > > > > > If you need to guarantee the order also in the case of procTime a > > > trick that you can do is to set the working time of the env to > > > processing time and to assign the proctime to the incoming stream. You > > can do this via . > > > assignTimestampsAndWatermarks(new ...) And override override def > > > extractTimestamp( > > > element: type..., > > > previousElementTimestamp: Long): Long = { > > > System.currentTimeMillis() > > > } > > > > > > Alternatively you can play around with the stream source and control > > > the time when the events come > > > > > > Dr. Radu Tudoran > > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > German Research Center > > > Munich Office > > > Riesstrasse 25, 80992 München > > > > > > E-mail: radu.tudo...@huawei.com > > > Mobile: +49 15209084330 > > > Telephone: +49 891588344173 > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > This e-mail and its attachments contain confidential information from > > > HUAWEI, which is intended only for the person or entity whose address > is > > > listed above. Any use of the information contained herein in any way > > > (including, but not limited to, total or partial disclosure, > > reproduction, > > > or dissemination) by persons other than the intended recipient(s) is > > > prohibited. If you receive this e-mail in error, please notify the > sender > > > by phone or email immediately and delete it! > > > > > > > > > -----Original Message----- > > > From: fhue...@gmail.com [mailto:fhue...@gmail.com] > > > Sent: Tuesday, April 11, 2017 2:24 PM > > > To: Stefano Bortoli; dev@flink.apache.org > > > Subject: AW: Question about the process order in stream aggregate > > > > > > Resending to dev@f.a.o > > > > > > Hi Xingcan, > > > > > > This is expected behavior. In general, is not possible to guarantee > > > results for processing time. > > > > > > Your query is translated as follows: > > > > > > CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) > > > -fwd-> MapFunc(n) -fwd-> Sink(n) > > > > > > The order of records is changed because of the connection between > source > > > and first map function. Here, records are distributed round robin to > > > increase the parallelism from 1 to n. The parallel instances of map > might > > > forward the records in different order to the ProcessFunction that > > computes > > > the aggregation. > > > > > > Hope this helps, > > > Fabian > > > > > > > > > Von: Stefano Bortoli > > > Gesendet: Dienstag, 11. April 2017 14:10 > > > An: dev@flink.apache.org > > > Betreff: RE: Question about the process order in stream aggregate > > > > > > Hi Xingcan, > > > > > > Are you using parallelism 1 for the test? procTime semantics deals > with > > > the objects as they loaded in the operators. It could be the > co-occuring > > > partitioned events (in the same MS time frame) are processed in > parallel > > > and then the output is produced in different order. > > > > > > I suggest you to have a look at the integration test to verify that the > > > configuration of your experiment is correct. > > > > > > Best, > > > Stefano > > > > > > -----Original Message----- > > > From: Xingcan Cui [mailto:xingc...@gmail.com] > > > Sent: Tuesday, April 11, 2017 5:31 AM > > > To: dev@flink.apache.org > > > Subject: Question about the process order in stream aggregate > > > > > > Hi all, > > > > > > I run some tests for stream aggregation on rows. The data stream is > > simply > > > registered as > > > > > > val orderA: DataStream[Order] = env.fromCollection(Seq( > > > Order(1L, "beer", 1), > > > Order(2L, "diaper", 2), > > > Order(3L, "diaper", 3), > > > Order(4L, "rubber", 4))) > > > tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount), > > > > > > and the SQL is defined as > > > > > > select product, sum(amount) over (partition by product order by > > procTime() > > > rows between unbounded preceding and current row from orderA). > > > > > > My expected output should be > > > > > > 2> Result(beer,1) > > > 2> Result(diaper,2) > > > 1> Result(rubber,4) > > > 2> Result(diaper,5). > > > > > > However, sometimes I get the following output > > > > > > 2> Result(beer,1) > > > 2> Result(diaper,3) > > > 1> Result(rubber,4) > > > 2> Result(diaper,5). > > > > > > It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", > > 3)" > > > are out of order. Is that normal? > > > > > > BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the > > > order for them can always be preserved. > > > > > > Thanks, > > > Xingcan > > > > > > > > >