Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an
additional state for it. In the end, I went for coalescing as documentation
suggested so that I will have just one timer per interval. What I didn't
catch initially from the documentation is that* for a determined key and a
determined timestamp Flink will retain just one timer, i.e. if I set two
timers to trigger at the same time T, Flink will trigger the timer once.*
I accept then to have at least one coalesced timer per interval.

Thank you again for your support!

Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang <myas...@live.com> ha
scritto:

> If you are using processing time, one possible way is to track last
> registered in another ValueState<long>. And you could call
> #deleteProcessingTimeTimer(time) when you register new timer and found
> previous timer which stored in ValueState has smaller timestamp(T1) than
> current time (T2). After delete that processing timer, T1 would not trigger
> any action. You could refer to [1] and its usage for similar ideas.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java
>
> ------------------------------
> *From:* Andrea Spina <andrea.sp...@radicalbit.io>
> *Sent:* Tuesday, June 25, 2019 23:40
> *To:* Yun Tang
> *Cc:* user
> *Subject:* Re: Process Function's timers "postponing"
>
> Hi Yun, thank you for your answer. I'm not sure I got your point. My
> question is:
> for the same key K, I process two records R1 at t1 and R2 at t2.
> When I process R1, I set a timer to be triggered at T1 which is > t2
> When I process R2, I set a timer to be triggered at T2 which is > T1, but
> in order to do that, I want to remove the previous timer T1 in order to
> "postpone" the triggering.
>
> In other words, I would like for a single key to be active just one-timer
> and if a new timer is requested the old one should be deleted.
>
> Thank you,
>
> Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <myas...@live.com> ha
> scritto:
>
> Hi Andrea
>
> If my understanding is correct, you just want to know when the eventual
> timer would be deleted. When you register your timer into
> 'processingTimeTimersQueue' (where your timer stored) at [1], the
> 'SystemProcessingTimeService' would then schedule a runnable TriggerTask
> after the "postpone" delay at [2]. When the scheduled runnable is
> triggered, it would poll from the 'processingTimeTimersQueue' [3] which
> means the timer would finally be removed. Hope this could help you.
>
> Best
> Yun Tang
>
> [1]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
> [2]
> https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
> [3]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
>
> <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>
>
> ------------------------------
> *From:* Andrea Spina <andrea.sp...@radicalbit.io>
> *Sent:* Tuesday, June 25, 2019 2:06
> *To:* user
> *Subject:* Process Function's timers "postponing"
>
> Dear Community,
> I am using Flink (processing-time) timers along with a Process Function.
> What I would like to do is to "postpone" eventually registered timers for
> the given key: I would like to do it since I might process plenty of events
> in a row (think about it as a session) so that I will able to trigger the
> computation "just after" this session somehow stops.
>
> I wondered about deleting eventual existing timers but AFAIU I need to
> know the previous timer triggering time, which I guess is not possible for
> me since I use processing-time timers.
>
> I read also [1] but I am not really able to understand if it comes handy
> to me; for instance, I don't understand what "Since Flink maintains only
> one timer per key and timestamp...". Does this imply that a new PT timer
> will automatically overwrite an eventual previously existing one?
>
> Thank you for your precious help,
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Reply via email to