Hi, just a bit of clarification. In the example above: time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2) time = 1, processElement(B) -> put B in state keyed to t=1, registerProcTimer(2) // deduplicated time = 2, onTimer(2) -> access state with key t=2-1, get A, B time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)
the order of the two last calls is not deterministic. If you have a timer set for proc-time == 2 and an elements also arrives exactly at proc-time == 2 then the order of those onTimer() and processElement() calls is arbitrary. I think the only way of ensuring deterministic behaviour here is to put everything that happens at proc-time == 2 into a buffer, wait for proc-time == 3 and then process the buffered invocations in a deterministic order. This is not something that we plan to do, I think. Best, Aljoscha > On 31. Mar 2017, at 13:20, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > Hi, > > Yes it does – thanks a lot > > Knowing that this is the order > time = 2, onTimer(2) -> access state with key t=2-1, get A, B > time = 2, pocessElement(C) -> put C in state keyed to t=2, > registerProcTimer(3) > is useful! > > Dr. Radu Tudoran > Senior Research Engineer - Big Data Expert > IT R&D Division > > <image001.png> > HUAWEI TECHNOLOGIES Duesseldorf GmbH > German Research Center > Munich Office > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com> > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > <http://www.huawei.com/> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > This e-mail and its attachments contain confidential information from HUAWEI, > which is intended only for the person or entity whose address is listed > above. Any use of the information contained herein in any way (including, but > not limited to, total or partial disclosure, reproduction, or dissemination) > by persons other than the intended recipient(s) is prohibited. If you receive > this e-mail in error, please notify the sender by phone or email immediately > and delete it! > > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Friday, March 31, 2017 12:00 PM > To: Radu Tudoran > Cc: user@flink.apache.org > Subject: Re: concurrency? > > Hi Radu, > > timers are fired in order of their time stamps. > Multiple timers on the same time are deduplicated. > > if you have the following logic: > > time = 1, processElement(A) -> put A in state keyed to t=1, > registerProcTimer(2) > time = 1, processElement(B) -> put B in state keyed to t=1, > registerProcTimer(2) // deduplicated > time = 2, onTimer(2) -> access state with key t=2-1, get A, B > time = 2, pocessElement(C) -> put C in state keyed to t=2, > registerProcTimer(3) > ... > > You get all calls in the right order. > > Does that answer you questions? > > > 2017-03-31 11:36 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com > <mailto:radu.tudo...@huawei.com>>: > Hi, > > Thanks Fabian. But is there also a fixed order that is imposed in their > execution? > > I am asking this because it is not enough just to have them executed > atomically. If once you have the processElement() being called and then > onTimer(), and in the next called you have them vice versa, it would mean > that you need additional mechanism to synchronize your logic. Right? > For example if in the > process element you do state.update (newValue) > and in the ontimer you do out.collect(state.getValue()) > > than if you have ev1,ev2 and eve3 coming at consecutive times and once the > function are executed processelement and than timer and then in reverse order > your output would be: > > time1: (processElement) ev1 –arrives > state=ev1 > time2: (processElement – executed first) ev2-arrives state=ev2 > onTime(executed second): out = ev2 > time3: (processElement – executed second) ev3-arrives state=ev3 > onTime(executed first): out = ev2 > > Best regards, > > Dr. Radu Tudoran > Senior Research Engineer - Big Data Expert > IT R&D Division > > <image001.png> > HUAWEI TECHNOLOGIES Duesseldorf GmbH > German Research Center > Munich Office > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com> > Mobile: +49 15209084330 <tel:+49%201520%209084330> > Telephone: +49 891588344173 <tel:+49%2089%201588344173> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > <http://www.huawei.com/> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > This e-mail and its attachments contain confidential information from HUAWEI, > which is intended only for the person or entity whose address is listed > above. Any use of the information contained herein in any way (including, but > not limited to, total or partial disclosure, reproduction, or dissemination) > by persons other than the intended recipient(s) is prohibited. If you receive > this e-mail in error, please notify the sender by phone or email immediately > and delete it! > > From: Fabian Hueske [mailto:fhue...@gmail.com <mailto:fhue...@gmail.com>] > Sent: Friday, March 31, 2017 11:05 AM > To: Radu Tudoran > Cc: user@flink.apache.org <mailto:user@flink.apache.org> > Subject: Re: concurrency? > > Hi Radu, > > the processElement() and onTimer() calls are synchronized by a lock, i.e., > they won't be called at the same time. > > Best, Fabian > > 2017-03-31 9:34 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com > <mailto:radu.tudo...@huawei.com>>: > Hi, > > I would like to use a processFunction to accumulate elements. Therefore in > the processElement function I will accumulate this element into a state. > However, I would like to emit the output only 1ms later. Therefore I would > register a timer to trigger one second later and read the state and emit it. > However, I am curious of what happens if in the next ms another event > arrives. In principle both the processElement function and the onTimer > function should be triggered in the same time. My question is: is there a > fix order to execute them? Because if any of them work just like normal > threads, than concurrency related issues could happen when accessing the > state.