Hi Yun, thank you for your answer. I'm not sure I got your point. My question is: for the same key K, I process two records R1 at t1 and R2 at t2. When I process R1, I set a timer to be triggered at T1 which is > t2 When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering.
In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted. Thank you, Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <myas...@live.com> ha scritto: > Hi Andrea > > If my understanding is correct, you just want to know when the eventual > timer would be deleted. When you register your timer into > 'processingTimeTimersQueue' (where your timer stored) at [1], the > 'SystemProcessingTimeService' would then schedule a runnable TriggerTask > after the "postpone" delay at [2]. When the scheduled runnable is > triggered, it would poll from the 'processingTimeTimersQueue' [3] which > means the timer would finally be removed. Hope this could help you. > > Best > Yun Tang > > [1] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208 > [2] > https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121 > [3] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237 > > <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237> > > ------------------------------ > *From:* Andrea Spina <andrea.sp...@radicalbit.io> > *Sent:* Tuesday, June 25, 2019 2:06 > *To:* user > *Subject:* Process Function's timers "postponing" > > Dear Community, > I am using Flink (processing-time) timers along with a Process Function. > What I would like to do is to "postpone" eventually registered timers for > the given key: I would like to do it since I might process plenty of events > in a row (think about it as a session) so that I will able to trigger the > computation "just after" this session somehow stops. > > I wondered about deleting eventual existing timers but AFAIU I need to > know the previous timer triggering time, which I guess is not possible for > me since I use processing-time timers. > > I read also [1] but I am not really able to understand if it comes handy > to me; for instance, I don't understand what "Since Flink maintains only > one timer per key and timestamp...". Does this imply that a new PT timer > will automatically overwrite an eventual previously existing one? > > Thank you for your precious help, > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing > -- > *Andrea Spina* > Head of R&D @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > -- *Andrea Spina* Head of R&D @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT