Hello,
I have a pipeline which is generating heartbeats using looping timers in a
stateful dofn. Following is pseudo code for the process element and onTimer
methods

StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...);
TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);


processElemnt(input) {
// read event time from the message
Instant currentEventTime = input.getEventTimeEpoc();
if(input.state == ONLINE) {
   lastSeenMsg.write(input);
   // calculate start of looping timer
   // which will be next 5 min boundary
   long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
   long offset = currentEventTimeEpocSeconds % 300;
   long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
   loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
}
else {
     // stop hearbeats when entity offline
      loopingTimer.clear();
   }
}


onTimer() {
// emit out the lastSeenMsg
output(lastSeenMsg.read());

loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
}


The above pipeline works well in low load scenarios. But on one of my heavy
traffic deployment the pipeline seems to be not able to keep up with the
load. Input msg from pubsub are state change events for an entity -  Entity
Online or Entity Offline messages. Once a entity comes Online we start
generating heartbeat every 5 min as long as we do not encounter Offline
message for that entity. Number of online entities can be fairly large,
more than 10 Million entities can be Online at a given time.

I am seeing this particular DoFn starts lagging behind as soon as it gets
started. The timers are firing pretty late. The lag went up to 48 hours
before I restarted the pipeline. Is there something wrong in what I am
doing.
Note - I am reading the eventTime embedded in the msg. Intent for this is
fire a bunch of timers in quick succession if needed and fill up the DB
with heartbeats till current time.
So say a msg comes with state = Online and time = 10.02 AM. and current
watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
expect to fire immediately since the watermark is already ahead of this
time? (Or this is wrong understanding). Similarly the subsequent call to
onTimer method will set next timer to fire at 10:10 and that I also expect
to fire immediately. After this point this DoFn should start emitting at
same time with all other instances of this DoFn. Is there a mistake in this
implementaion?
Another thing I am noticing is that this pipeline is running a single
dataflow worker and not scaling up automatically. For such a large key
space (10 million DoFns and their timers) i expected the pipeline to use a
lot of CPU near the 5 minute boudaries and scale up but that is also not
happening.

Reply via email to