Maybe the previous post was too verbose so I will try to summarize my
question -
If one instance of DoFn tries to set a timer for a time which is behind the
pipeline's watermark, can this cause the pipeline to stall for other keys
as well?
"stall" meaning here - other keys' timers will start lagging behind.
say there are 1 million DoFns running in a steady state(behaving as
expected), where timers are firing at 5 min boundaries.
1 bad key comes which sets its timer to a time which is 1 hour older than
the current watermark. What happens here? my understanding here is this -
 the looping timer will fire back to back in quick succession for this bad
key 12 times and after that this key also joins the group of 1 million keys
which were firing regularly at 5 min boundaries.
PS - Above DoFn is using default Global Windows and default trigger.


On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <gauravmishra.it...@gmail.com>
wrote:

> Hello,
> I have a pipeline which is generating heartbeats using looping timers in a
> stateful dofn. Following is pseudo code for the process element and onTimer
> methods
>
> StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...);
> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>
> processElemnt(input) {
> // read event time from the message
> Instant currentEventTime = input.getEventTimeEpoc();
> if(input.state == ONLINE) {
>    lastSeenMsg.write(input);
>    // calculate start of looping timer
>    // which will be next 5 min boundary
>    long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
>    long offset = currentEventTimeEpocSeconds % 300;
>    long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>    loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
> }
> else {
>      // stop hearbeats when entity offline
>       loopingTimer.clear();
>    }
> }
>
>
> onTimer() {
> // emit out the lastSeenMsg
> output(lastSeenMsg.read());
>
>
> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
> }
>
>
> The above pipeline works well in low load scenarios. But on one of my
> heavy traffic deployment the pipeline seems to be not able to keep up with
> the load. Input msg from pubsub are state change events for an entity -
>  Entity Online or Entity Offline messages. Once a entity comes Online we
> start generating heartbeat every 5 min as long as we do not encounter
> Offline message for that entity. Number of online entities can be fairly
> large, more than 10 Million entities can be Online at a given time.
>
> I am seeing this particular DoFn starts lagging behind as soon as it gets
> started. The timers are firing pretty late. The lag went up to 48 hours
> before I restarted the pipeline. Is there something wrong in what I am
> doing.
> Note - I am reading the eventTime embedded in the msg. Intent for this is
> fire a bunch of timers in quick succession if needed and fill up the DB
> with heartbeats till current time.
> So say a msg comes with state = Online and time = 10.02 AM. and current
> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
> expect to fire immediately since the watermark is already ahead of this
> time? (Or this is wrong understanding). Similarly the subsequent call to
> onTimer method will set next timer to fire at 10:10 and that I also expect
> to fire immediately. After this point this DoFn should start emitting at
> same time with all other instances of this DoFn. Is there a mistake in this
> implementaion?
> Another thing I am noticing is that this pipeline is running a single
> dataflow worker and not scaling up automatically. For such a large key
> space (10 million DoFns and their timers) i expected the pipeline to use a
> lot of CPU near the 5 minute boudaries and scale up but that is also not
> happening.
>

Reply via email to