Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an additional state for it. In the end, I went for coalescing as documentation suggested so that I will have just one timer per interval. What I didn't catch initially from the documentation is that* for a determined key and a determined timestamp Flink will retain just one timer, i.e. if I set two timers to trigger at the same time T, Flink will trigger the timer once.* I accept then to have at least one coalesced timer per interval.
Thank you again for your support! Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang <myas...@live.com> ha scritto: > If you are using processing time, one possible way is to track last > registered in another ValueState<long>. And you could call > #deleteProcessingTimeTimer(time) when you register new timer and found > previous timer which stored in ValueState has smaller timestamp(T1) than > current time (T2). After delete that processing timer, T1 would not trigger > any action. You could refer to [1] and its usage for similar ideas. > > > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java > > ------------------------------ > *From:* Andrea Spina <andrea.sp...@radicalbit.io> > *Sent:* Tuesday, June 25, 2019 23:40 > *To:* Yun Tang > *Cc:* user > *Subject:* Re: Process Function's timers "postponing" > > Hi Yun, thank you for your answer. I'm not sure I got your point. My > question is: > for the same key K, I process two records R1 at t1 and R2 at t2. > When I process R1, I set a timer to be triggered at T1 which is > t2 > When I process R2, I set a timer to be triggered at T2 which is > T1, but > in order to do that, I want to remove the previous timer T1 in order to > "postpone" the triggering. > > In other words, I would like for a single key to be active just one-timer > and if a new timer is requested the old one should be deleted. > > Thank you, > > Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <myas...@live.com> ha > scritto: > > Hi Andrea > > If my understanding is correct, you just want to know when the eventual > timer would be deleted. When you register your timer into > 'processingTimeTimersQueue' (where your timer stored) at [1], the > 'SystemProcessingTimeService' would then schedule a runnable TriggerTask > after the "postpone" delay at [2]. When the scheduled runnable is > triggered, it would poll from the 'processingTimeTimersQueue' [3] which > means the timer would finally be removed. Hope this could help you. > > Best > Yun Tang > > [1] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208 > [2] > https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121 > [3] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237 > > <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237> > > ------------------------------ > *From:* Andrea Spina <andrea.sp...@radicalbit.io> > *Sent:* Tuesday, June 25, 2019 2:06 > *To:* user > *Subject:* Process Function's timers "postponing" > > Dear Community, > I am using Flink (processing-time) timers along with a Process Function. > What I would like to do is to "postpone" eventually registered timers for > the given key: I would like to do it since I might process plenty of events > in a row (think about it as a session) so that I will able to trigger the > computation "just after" this session somehow stops. > > I wondered about deleting eventual existing timers but AFAIU I need to > know the previous timer triggering time, which I guess is not possible for > me since I use processing-time timers. > > I read also [1] but I am not really able to understand if it comes handy > to me; for instance, I don't understand what "Since Flink maintains only > one timer per key and timestamp...". Does this imply that a new PT timer > will automatically overwrite an eventual previously existing one? > > Thank you for your precious help, > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing > -- > *Andrea Spina* > Head of R&D @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > > > > -- > *Andrea Spina* > Head of R&D @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > -- *Andrea Spina* Head of R&D @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT