If you are using processing time, one possible way is to track last registered 
in another ValueState<long>. And you could call 
#deleteProcessingTimeTimer(time) when you register new timer and found previous 
timer which stored in ValueState has smaller timestamp(T1) than current time 
(T2). After delete that processing timer, T1 would not trigger any action. You 
could refer to [1] and its usage for similar ideas.


[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java

________________________________
From: Andrea Spina <andrea.sp...@radicalbit.io>
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"

Hi Yun, thank you for your answer. I'm not sure I got your point. My question 
is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in 
order to do that, I want to remove the previous timer T1 in order to "postpone" 
the triggering.

In other words, I would like for a single key to be active just one-timer and 
if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer 
would be deleted. When you register your timer into 'processingTimeTimersQueue' 
(where your timer stored) at [1], the 'SystemProcessingTimeService' would then 
schedule a runnable TriggerTask after the "postpone" delay at [2]. When the 
scheduled runnable is triggered, it would poll from the 
'processingTimeTimersQueue' [3] which means the timer would finally be removed. 
Hope this could help you.

Best
Yun Tang

[1] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] 
https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>

________________________________
From: Andrea Spina 
<andrea.sp...@radicalbit.io<mailto:andrea.sp...@radicalbit.io>>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I 
would like to do is to "postpone" eventually registered timers for the given 
key: I would like to do it since I might process plenty of events in a row 
(think about it as a session) so that I will able to trigger the computation 
"just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the 
previous timer triggering time, which I guess is not possible for me since I 
use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; 
for instance, I don't understand what "Since Flink maintains only one timer per 
key and timestamp...". Does this imply that a new PT timer will automatically 
overwrite an eventual previously existing one?

Thank you for your precious help,

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Reply via email to