Hi,
I have a bunch of devices that keep sending heartbeat messages. I want to
make an operator that emits messages when a device disconnects and when a
device stops being disconnected.
A device is considered disconnected if we don't receive any heartbeat for
more than some TIMEOUT duration.
This seemed like a good candidate for session windows, but I am not sure
how I can express the inverse logic (i.e. detecting periods of inactivity
instead of activity) using Flink's operators.
I want to use event time for all processing and ideally want to achieve
this behaviour using a single operator.

So I am trying to implement a custom processfunction that, on every
heartbeat:

   - Deletes any previous event time timer
   - Registers a new timer to fire at heartbeat.timestamp + TIMEOUT

The basic idea is that every new heartbeat will keep pushing the timer
forward. Only when heartbeats stop arriving does the timer fire, indicating
the start of a disconnected state.
Code:

public class IUDisconnectedStateDetector extends
KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {

    // Tracks if this monitor is disconnected or not.
    private ValueState<Boolean> isDisconnectedStateStore;
    // Tracks which timer was registered.
    private ValueState<Long> registeredTimerStateStore;

    private final Logger LOGGER =
LoggerFactory.getLogger(IUDisconnectedStateDetector.class);

    // Called by the Flink runtime before starting this operator. We
initialize the state stores here.
    @Override
    public void open(Configuration parameters) throws Exception {
        isDisconnectedStateStore = getRuntimeContext().getState(new
ValueStateDescriptor<Boolean>(
                DISCONNECTED_STATE_STORE_NAME, Boolean.class));
        registeredTimerStateStore = getRuntimeContext().getState(new
ValueStateDescriptor<Long>(
                REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
    }

    @Override
    public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out) throws Exception {
        Boolean isDisconnected = isDisconnectedStateStore.value();
        LOGGER.info("Watermark: " + heartbeat + ", isDisconnected : "
+ isDisconnected
                +" last registered timer :" +
registeredTimerStateStore.value());


        // If this is the first message for this monitor or is the
first message after a disconnection.
        if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
            // Delete previous timer.
            if (registeredTimerStateStore.value() != null)

ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

            // Register a timer that will fire in the future if no
further events are received.
            long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
            ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
            registeredTimerStateStore.update(timerFiringTimestamp);

            // Emit a message indicating END of the disconnected state.
            IUSessionMessage message = new IUSessionMessage(
                    new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
                    new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
            out.collect(message);
            LOGGER.info(message.getSessionInfo().toString());
            // Update the state store.
            isDisconnectedStateStore.update(Boolean.FALSE);
        }
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
Collector<IUSessionMessage> out) throws Exception {
        if (isDisconnectedStateStore.value() == Boolean.FALSE) {
            // If this timer fires that means no message was received
from the monitor for some timeout duration.
            // Update the state store.
            isDisconnectedStateStore.update(Boolean.TRUE);

            // Emit a message indicating START of the disconnected
state. Note that since this is applicable for a monitor,
            IUSessionMessage message = new IUSessionMessage(
                    new
IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),
"dummyFeatureName", "dummyDeviceId"),
                    new IUSessionInfo(timestamp, IUStatus.STARTED,
IUEventType.NO_VALUE));
            out.collect(message);

            LOGGER.info(message.getSessionInfo().toString());
        }
    }
}

*However, the above code does not behave as expected - the timer fires even
when (a) it has received heartbeats within the timeout and (b) I have the
code to delete it*. So, my questions:

   - Am I deleting the timer incorrectly? I use a state store to keep track
   of registered timer's timestamps and use that value when deleting.
   - Am I overcomplicating things? Can this be achieved using Flink's
   inbuild session windowing operators?

Thanks!

Reply via email to