Hello, I have a pipeline which is generating heartbeats using looping timers in a stateful dofn. Following is pseudo code for the process element and onTimer methods
StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...); TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); processElemnt(input) { // read event time from the message Instant currentEventTime = input.getEventTimeEpoc(); if(input.state == ONLINE) { lastSeenMsg.write(input); // calculate start of looping timer // which will be next 5 min boundary long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000; long offset = currentEventTimeEpocSeconds % 300; long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300; loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds)); } else { // stop hearbeats when entity offline loopingTimer.clear(); } } onTimer() { // emit out the lastSeenMsg output(lastSeenMsg.read()); loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300))); } The above pipeline works well in low load scenarios. But on one of my heavy traffic deployment the pipeline seems to be not able to keep up with the load. Input msg from pubsub are state change events for an entity - Entity Online or Entity Offline messages. Once a entity comes Online we start generating heartbeat every 5 min as long as we do not encounter Offline message for that entity. Number of online entities can be fairly large, more than 10 Million entities can be Online at a given time. I am seeing this particular DoFn starts lagging behind as soon as it gets started. The timers are firing pretty late. The lag went up to 48 hours before I restarted the pipeline. Is there something wrong in what I am doing. Note - I am reading the eventTime embedded in the msg. Intent for this is fire a bunch of timers in quick succession if needed and fill up the DB with heartbeats till current time. So say a msg comes with state = Online and time = 10.02 AM. and current watermark is at 10.13AM. I set the loopingTimer to start at 10:05, which i expect to fire immediately since the watermark is already ahead of this time? (Or this is wrong understanding). Similarly the subsequent call to onTimer method will set next timer to fire at 10:10 and that I also expect to fire immediately. After this point this DoFn should start emitting at same time with all other instances of this DoFn. Is there a mistake in this implementaion? Another thing I am noticing is that this pipeline is running a single dataflow worker and not scaling up automatically. For such a large key space (10 million DoFns and their timers) i expected the pipeline to use a lot of CPU near the 5 minute boudaries and scale up but that is also not happening.