Hi,
just a bit of clarification. In the example above:

time = 1, processElement(A) -> put A in state keyed to t=1, registerProcTimer(2)
time = 1, processElement(B) -> put B in state keyed to t=1, 
registerProcTimer(2) // deduplicated
time = 2, onTimer(2) -> access state with key t=2-1, get A, B
time = 2, pocessElement(C) -> put C in state keyed to t=2, registerProcTimer(3)

the order of the two last calls is not deterministic. If you have a timer set 
for proc-time == 2 and an elements also arrives exactly at proc-time == 2 then 
the order of those onTimer() and processElement() calls is arbitrary. I think 
the only way of ensuring deterministic behaviour here is to put everything that 
happens at proc-time == 2 into a buffer, wait for proc-time == 3 and then 
process the buffered invocations in a deterministic order. This is not 
something that we plan to do, I think.

Best,
Aljoscha
> On 31. Mar 2017, at 13:20, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> Hi,
>  
> Yes it does – thanks a lot
>  
> Knowing that this is the order
> time = 2, onTimer(2) -> access state with key t=2-1, get A, B
> time = 2, pocessElement(C) -> put C in state keyed to t=2, 
> registerProcTimer(3)
> is useful!
>  
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
> <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com] 
> Sent: Friday, March 31, 2017 12:00 PM
> To: Radu Tudoran
> Cc: user@flink.apache.org
> Subject: Re: concurrency?
>  
> Hi Radu,
> 
> timers are fired in order of their time stamps. 
> Multiple timers on the same time are deduplicated. 
>  
> if you have the following logic:
> 
> time = 1, processElement(A) -> put A in state keyed to t=1, 
> registerProcTimer(2)
> time = 1, processElement(B) -> put B in state keyed to t=1, 
> registerProcTimer(2) // deduplicated
> time = 2, onTimer(2) -> access state with key t=2-1, get A, B
> time = 2, pocessElement(C) -> put C in state keyed to t=2, 
> registerProcTimer(3)
> ...
> 
> You get all calls in the right order.
> 
> Does that answer you questions?
>  
>  
> 2017-03-31 11:36 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com 
> <mailto:radu.tudo...@huawei.com>>:
> Hi,
>  
> Thanks Fabian. But is there also a fixed order that is imposed in their 
> execution?
>  
> I am asking this because it is not enough just to have them executed 
> atomically. If once you have the processElement() being called and then 
> onTimer(), and in the next called you have them vice versa, it would mean 
> that you need additional mechanism to synchronize your logic. Right?
> For example if in the
> process element you do state.update (newValue)
> and in the ontimer you do out.collect(state.getValue())
>  
> than if you have ev1,ev2 and eve3 coming at consecutive times and once the 
> function are executed processelement and than timer and then in reverse order 
> your output would be:
>  
> time1: (processElement)                                    ev1 –arrives 
> state=ev1            
> time2: (processElement – executed first)       ev2-arrives state=ev2          
>     onTime(executed second):   out = ev2
> time3: (processElement – executed second) ev3-arrives state=ev3              
> onTime(executed first):          out = ev2
>  
> Best regards,
>  
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>
> Mobile: +49 15209084330 <tel:+49%201520%209084330>
> Telephone: +49 891588344173 <tel:+49%2089%201588344173>
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
> <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com <mailto:fhue...@gmail.com>] 
> Sent: Friday, March 31, 2017 11:05 AM
> To: Radu Tudoran
> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Re: concurrency?
>  
> Hi Radu,
> 
> the processElement() and onTimer() calls are synchronized by a lock, i.e., 
> they won't be called at the same time.
> 
> Best, Fabian
>  
> 2017-03-31 9:34 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com 
> <mailto:radu.tudo...@huawei.com>>:
> Hi,
>  
> I would like to use a processFunction to accumulate elements. Therefore in 
> the processElement function I will accumulate this element into a state. 
> However, I would like to emit the output only 1ms later. Therefore I would 
> register a timer to trigger one second later and read the state and emit it.
> However, I am curious of what happens if in the next ms another event 
> arrives. In principle both the processElement function and the onTimer 
> function should be triggered in the same time.  My question is: is there a 
> fix order to execute them?  Because if any of them work just like normal 
> threads, than concurrency related issues could happen when accessing the 
> state.

Reply via email to