Sometimes it's not easy to spot the obvious ;-)

Great that it works now. Let us know if you have further questions.

Regards,
Timo

On 11.08.20 10:51, Manas Kale wrote:
Hi Timo,
I got it, the issue was a (silly) mistake on my part. I unnecessarily put all the processElement() logic inside the if condition. The if() condition is there because I want to emit a disconnected STOPPED message only once.
So the correct code is :

   @Override
public void processElement(IUHeartbeat heartbeat, Context ctx, 
Collector<IUSessionMessage> out)throws Exception {
         Boolean isDisconnected =isDisconnectedStateStore.value();
// LOGGER.info("Watermark: " + ctx.timerService().currentWatermark() + " Processing timestamp : "+ heartbeat.getTimestamp() + ", isDisconnected : " + isDisconnected
// +" last registered timer :" + registeredTimerStateStore.value());

// Delete previous timer.
if (registeredTimerStateStore.value() !=null)
             
ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

// Register a timer that will fire in the future if no further events are received.
long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT;
         ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
         registeredTimerStateStore.update(timerFiringTimestamp);

// If this is the first message for this monitor or is the first message after a disconnection.
if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
             // Emit a message indicating END of the disconnected state.
IUSessionMessage message =new IUSessionMessage(
                     new 
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
                     new IUSessionInfo(heartbeat.getTimestamp(), 
IUStatus.ENDED, IUEventType.NO_VALUE));
             out.collect(message);
             LOGGER.info(message.getSessionInfo().toString());
         }

         // Update the state store.
isDisconnectedStateStore.update(Boolean.FALSE);
     }


This produces the expected output.
Also, I will assume that this is the best way to solve my problem - I can't use Flink's session windows. Let me know if anyone has any other ideas though!

Thank you for your time and quick response!


On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <[email protected] <mailto:[email protected]>> wrote:

    Hi Manas,

    at the first glance your code looks correct to me. I would investigate
    if your keys and watermarks are correct. Esp. the watermark frequency
    could be an issue. If watermarks are generated at the same time as the
    heartbeats itself, it might be the case that the timers fire first
    before the process() function is called which resets the timer.

    Maybe you can give us more information how watermarks are generated?

    Regards,
    Timo

    On 11.08.20 08:33, Manas Kale wrote:
     > Hi,
     > I have a bunch of devices that keep sending heartbeat messages. I
    want
     > to make an operator that emits messages when a device disconnects
    and
     > when a device stops being disconnected.
     > A device is considered disconnected if we don't receive any
    heartbeat
     > for more than some TIMEOUT duration.
     > This seemed like a good candidate for session windows, but I am
    not sure
     > how I can express the inverse logic (i.e. detecting periods of
     > inactivity instead of activity) using Flink's operators.
     > I want to use event time for all processing and ideally want to
    achieve
     > this behaviour using a single operator.
     >
     > So I am trying to implement a custom processfunction that, on every
     > heartbeat:
     >
     >   * Deletes any previous event time timer
     >   * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
     >
     > The basic idea is that every new heartbeat will keep pushing the
    timer
     > forward. Only when heartbeats stop arriving does the timer fire,
     > indicating the start of a disconnected state.
     > Code:
     >
     > public class IUDisconnectedStateDetectorextends
    KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
     >
     >      // Tracks if this monitor is disconnected or not.
     > private ValueState<Boolean>isDisconnectedStateStore;
     >      // Tracks which timer was registered.
     > private ValueState<Long>registeredTimerStateStore;
     >
     >      private final LoggerLOGGER =
    LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
     >
     >      // Called by the Flink runtime before starting this
    operator. We
     > initialize the state stores here.
     > @Override
     > public void open(Configuration parameters)throws Exception {
     >          isDisconnectedStateStore =
    getRuntimeContext().getState(new ValueStateDescriptor<Boolean>(
     >                  DISCONNECTED_STATE_STORE_NAME, Boolean.class));
     >          registeredTimerStateStore =
    getRuntimeContext().getState(new ValueStateDescriptor<Long>(
     >                  REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
     >      }
     >
     >      @Override
     > public void processElement(IUHeartbeat heartbeat, Context ctx,
    Collector<IUSessionMessage> out)throws Exception {
     >          Boolean isDisconnected =isDisconnectedStateStore.value();
     >          LOGGER.info("Watermark: " + heartbeat +", isDisconnected
    : " + isDisconnected
     >                  +" last registered timer :"
    +registeredTimerStateStore.value());
     >
     >
     >          // If this is the first message for this monitor or is
    the first message
     > after a disconnection.
     > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
     >              // Delete previous timer.
     > if (registeredTimerStateStore.value() !=null)
> ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
     >
     >              // Register a timer that will fire in the future if
    no further events
     > are received.
     > long timerFiringTimestamp = heartbeat.getTimestamp()
    +DISCONNECTED_TIMEOUT;
> ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
     >              registeredTimerStateStore.update(timerFiringTimestamp);
     >
     >              // Emit a message indicating END of the disconnected
    state.
     > IUSessionMessage message =new IUSessionMessage(
     >                      new
    
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
     >                      new IUSessionInfo(heartbeat.getTimestamp(),
    IUStatus.ENDED, IUEventType.NO_VALUE));
     >              out.collect(message);
     >              LOGGER.info(message.getSessionInfo().toString());
     >              // Update the state store.
     > isDisconnectedStateStore.update(Boolean.FALSE);
     >          }
     >      }
     >
     >
     >      @Override
     > public void onTimer(long timestamp, OnTimerContext ctx,
    Collector<IUSessionMessage> out)throws Exception {
     >          if (isDisconnectedStateStore.value() == Boolean.FALSE) {
     >              // If this timer fires that means no message was
    received from the
     > monitor for some timeout duration.
     > // Update the state store.
     > isDisconnectedStateStore.update(Boolean.TRUE);
     >
     >              // Emit a message indicating START of the
    disconnected state. Note that
     > since this is applicable for a monitor,
     > IUSessionMessage message =new IUSessionMessage(
     >                      new
    
IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
     >                      new IUSessionInfo(timestamp,
    IUStatus.STARTED, IUEventType.NO_VALUE));
     >              out.collect(message);
     >
     >              LOGGER.info(message.getSessionInfo().toString());
     >          }
     >      }
     > }
     >
     > *However, the above code does not behave as expected - the timer
    fires
     > even when (a) it has received heartbeats within the timeout and
    (b) I
     > have the code to delete it*. So, my questions:
     >
     >   * Am I deleting the timer incorrectly? I use a state store to keep
     >     track of registered timer's timestamps and use that value
    when deleting.
     >   * Am I overcomplicating things? Can this be achieved using Flink's
     >     inbuild session windowing operators?
     >
     > Thanks!


Reply via email to