Hello,
I guess you already evaluated moving to event time and you were not able
? Because this really seems to be a case for moving to event time
timers. I think that would require some effort (including choosing a
good watermark strategy) but then would solve all your problems.
Frank
On 08.02.22 08:42, Alex Drobinsky wrote:
Sure :) The problem could be defined as the following:
Imagine you have a stream of data , for example, network traffic.
This network traffic is keyed by source address / source port /
destination address / destination port / protocol type.
Every connection could be "completed" in two ways :
1) we encountered packet that indicates end of connection according to
protocol
2) we did not received any packet for that connection during last 60
seconds
In the second case, function onTimer called by Flink and session are
closed.
However, if a crash happens and checkpoint is restored, onTimer being
called immediately and session has been closed prematurely.
Now, I would like to prevent this from happening - so I have two
solutions - first solution is a workaround you already have seen in a
previous email e.g. first time onTimer has been triggered, it ignores
call and resets timer.
Second solution is rather hypothetical e.g. somehow forcing the timer
to be volatile or reset timer after restore , so the question is if
this second solution is feasible ?
вт, 8 февр. 2022 г. в 04:19, Yun Tang <myas...@live.com>:
Hi Alex,
I think the better solution is to know what the problem you have
ever met when restoring the timers?
Flink does not support to remove state (including timer state)
currently.
Best
Yun Tang
------------------------------------------------------------------------
*From:* Alex Drobinsky <alex.drobin...@gmail.com>
*Sent:* Monday, February 7, 2022 21:09
*To:* Caizhi Weng <tsreape...@gmail.com>
*Cc:* User-Flink <user@flink.apache.org>
*Subject:* Re: How to prevent check pointing of timers ?
By timer I mean regular timer from KeyedState which utilized via
function onTimer, for example:
public class StateWithTimer {
public long timerValue =0;
public volatile boolean shouldResetTimer =true;
public boolean resetIfMust(long timeoutInMilliseconds,TimerService
timerService) {
if (shouldResetTimer) {
setupTimer(timeoutInMilliseconds, timerService);
shouldResetTimer =false;
return true;
}
return false;
}
public void setupTimer(long timeoutInMilliseconds,TimerService
timerService) {
// Cancel previous timer
timerService.deleteProcessingTimeTimer(timerValue);
// Register new timer // Should it be configurable ? timerValue =
(timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
timerService.registerProcessingTimeTimer(timerValue);
}
}
State which utilizes timers extends StateWithTimer above, the
function resetIfMust is current workaround - it resets timers
first time after restart from checkpoint or start.
@Override public void onTimer(long timestamp,OnTimerContext
ctx,Collector<ClassifierOutput> collector) throws Exception {
MultiStorePacketState so =state.value();
if
(so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout,
ctx.timerService())) {
return;
}
closeAndReportFile(collector,so);
ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
state.update(so);
}
пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <tsreape...@gmail.com>:
Hi!
Could you elaborate more on your code or share it if possible?
Which timer are you talking about? Are you using the data
stream API or SQL API? Do you mean the timer registered per
record for a window aggregation? Does mini batch aggregation
[1] solve your problem?
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
Alex Drobinsky <alex.drobin...@gmail.com> 于2022年2月3日周四
20:41写道:
Dear flink user,
In our project, restoring the timer's state creates
numerous issues, so I would like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered
timers during the open function ?
Best regards,
Alexander