Maybe the previous post was too verbose so I will try to summarize my question - If one instance of DoFn tries to set a timer for a time which is behind the pipeline's watermark, can this cause the pipeline to stall for other keys as well? "stall" meaning here - other keys' timers will start lagging behind. say there are 1 million DoFns running in a steady state(behaving as expected), where timers are firing at 5 min boundaries. 1 bad key comes which sets its timer to a time which is 1 hour older than the current watermark. What happens here? my understanding here is this - the looping timer will fire back to back in quick succession for this bad key 12 times and after that this key also joins the group of 1 million keys which were firing regularly at 5 min boundaries. PS - Above DoFn is using default Global Windows and default trigger.
On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <gauravmishra.it...@gmail.com> wrote: > Hello, > I have a pipeline which is generating heartbeats using looping timers in a > stateful dofn. Following is pseudo code for the process element and onTimer > methods > > StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...); > TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); > > > processElemnt(input) { > // read event time from the message > Instant currentEventTime = input.getEventTimeEpoc(); > if(input.state == ONLINE) { > lastSeenMsg.write(input); > // calculate start of looping timer > // which will be next 5 min boundary > long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000; > long offset = currentEventTimeEpocSeconds % 300; > long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300; > loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds)); > } > else { > // stop hearbeats when entity offline > loopingTimer.clear(); > } > } > > > onTimer() { > // emit out the lastSeenMsg > output(lastSeenMsg.read()); > > > loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300))); > } > > > The above pipeline works well in low load scenarios. But on one of my > heavy traffic deployment the pipeline seems to be not able to keep up with > the load. Input msg from pubsub are state change events for an entity - > Entity Online or Entity Offline messages. Once a entity comes Online we > start generating heartbeat every 5 min as long as we do not encounter > Offline message for that entity. Number of online entities can be fairly > large, more than 10 Million entities can be Online at a given time. > > I am seeing this particular DoFn starts lagging behind as soon as it gets > started. The timers are firing pretty late. The lag went up to 48 hours > before I restarted the pipeline. Is there something wrong in what I am > doing. > Note - I am reading the eventTime embedded in the msg. Intent for this is > fire a bunch of timers in quick succession if needed and fill up the DB > with heartbeats till current time. > So say a msg comes with state = Online and time = 10.02 AM. and current > watermark is at 10.13AM. I set the loopingTimer to start at 10:05, which i > expect to fire immediately since the watermark is already ahead of this > time? (Or this is wrong understanding). Similarly the subsequent call to > onTimer method will set next timer to fire at 10:10 and that I also expect > to fire immediately. After this point this DoFn should start emitting at > same time with all other instances of this DoFn. Is there a mistake in this > implementaion? > Another thing I am noticing is that this pipeline is running a single > dataflow worker and not scaling up automatically. For such a large key > space (10 million DoFns and their timers) i expected the pipeline to use a > lot of CPU near the 5 minute boudaries and scale up but that is also not > happening. >