Hi Timo,
I got it, the issue was a (silly) mistake on my part. I unnecessarily
put all the processElement() logic inside the if condition. The if()
condition is there because I want to emit a disconnected STOPPED message
only once.
So the correct code is :
@Override
public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out)throws Exception {
Boolean isDisconnected =isDisconnectedStateStore.value();
// LOGGER.info("Watermark: " + ctx.timerService().currentWatermark() + "
Processing timestamp : "+ heartbeat.getTimestamp() + ", isDisconnected :
" + isDisconnected
// +" last registered timer :" + registeredTimerStateStore.value());
// Delete previous timer.
if (registeredTimerStateStore.value() !=null)
ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
// Register a timer that will fire in the future if no further events
are received.
long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT;
ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
registeredTimerStateStore.update(timerFiringTimestamp);
// If this is the first message for this monitor or is the first message
after a disconnection.
if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
// Emit a message indicating END of the disconnected state.
IUSessionMessage message =new IUSessionMessage(
new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
out.collect(message);
LOGGER.info(message.getSessionInfo().toString());
}
// Update the state store.
isDisconnectedStateStore.update(Boolean.FALSE);
}
This produces the expected output.
Also, I will assume that this is the best way to solve my problem - I
can't use Flink's session windows. Let me know if anyone has any other
ideas though!
Thank you for your time and quick response!
On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <[email protected]
<mailto:[email protected]>> wrote:
Hi Manas,
at the first glance your code looks correct to me. I would investigate
if your keys and watermarks are correct. Esp. the watermark frequency
could be an issue. If watermarks are generated at the same time as the
heartbeats itself, it might be the case that the timers fire first
before the process() function is called which resets the timer.
Maybe you can give us more information how watermarks are generated?
Regards,
Timo
On 11.08.20 08:33, Manas Kale wrote:
> Hi,
> I have a bunch of devices that keep sending heartbeat messages. I
want
> to make an operator that emits messages when a device disconnects
and
> when a device stops being disconnected.
> A device is considered disconnected if we don't receive any
heartbeat
> for more than some TIMEOUT duration.
> This seemed like a good candidate for session windows, but I am
not sure
> how I can express the inverse logic (i.e. detecting periods of
> inactivity instead of activity) using Flink's operators.
> I want to use event time for all processing and ideally want to
achieve
> this behaviour using a single operator.
>
> So I am trying to implement a custom processfunction that, on every
> heartbeat:
>
> * Deletes any previous event time timer
> * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
>
> The basic idea is that every new heartbeat will keep pushing the
timer
> forward. Only when heartbeats stop arriving does the timer fire,
> indicating the start of a disconnected state.
> Code:
>
> public class IUDisconnectedStateDetectorextends
KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
>
> // Tracks if this monitor is disconnected or not.
> private ValueState<Boolean>isDisconnectedStateStore;
> // Tracks which timer was registered.
> private ValueState<Long>registeredTimerStateStore;
>
> private final LoggerLOGGER =
LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
>
> // Called by the Flink runtime before starting this
operator. We
> initialize the state stores here.
> @Override
> public void open(Configuration parameters)throws Exception {
> isDisconnectedStateStore =
getRuntimeContext().getState(new ValueStateDescriptor<Boolean>(
> DISCONNECTED_STATE_STORE_NAME, Boolean.class));
> registeredTimerStateStore =
getRuntimeContext().getState(new ValueStateDescriptor<Long>(
> REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
> }
>
> @Override
> public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out)throws Exception {
> Boolean isDisconnected =isDisconnectedStateStore.value();
> LOGGER.info("Watermark: " + heartbeat +", isDisconnected
: " + isDisconnected
> +" last registered timer :"
+registeredTimerStateStore.value());
>
>
> // If this is the first message for this monitor or is
the first message
> after a disconnection.
> if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
> // Delete previous timer.
> if (registeredTimerStateStore.value() !=null)
>
ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
>
> // Register a timer that will fire in the future if
no further events
> are received.
> long timerFiringTimestamp = heartbeat.getTimestamp()
+DISCONNECTED_TIMEOUT;
>
ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
> registeredTimerStateStore.update(timerFiringTimestamp);
>
> // Emit a message indicating END of the disconnected
state.
> IUSessionMessage message =new IUSessionMessage(
> new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
> new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
> out.collect(message);
> LOGGER.info(message.getSessionInfo().toString());
> // Update the state store.
> isDisconnectedStateStore.update(Boolean.FALSE);
> }
> }
>
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx,
Collector<IUSessionMessage> out)throws Exception {
> if (isDisconnectedStateStore.value() == Boolean.FALSE) {
> // If this timer fires that means no message was
received from the
> monitor for some timeout duration.
> // Update the state store.
> isDisconnectedStateStore.update(Boolean.TRUE);
>
> // Emit a message indicating START of the
disconnected state. Note that
> since this is applicable for a monitor,
> IUSessionMessage message =new IUSessionMessage(
> new
IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
> new IUSessionInfo(timestamp,
IUStatus.STARTED, IUEventType.NO_VALUE));
> out.collect(message);
>
> LOGGER.info(message.getSessionInfo().toString());
> }
> }
> }
>
> *However, the above code does not behave as expected - the timer
fires
> even when (a) it has received heartbeats within the timeout and
(b) I
> have the code to delete it*. So, my questions:
>
> * Am I deleting the timer incorrectly? I use a state store to keep
> track of registered timer's timestamps and use that value
when deleting.
> * Am I overcomplicating things? Can this be achieved using Flink's
> inbuild session windowing operators?
>
> Thanks!