On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote:
>
> While I'm currently on the other side of the fence, I would not be against 
> changing/requiring the semantics of ProcessingTime constructs to be "must 
> wait and execute" as such a solution, and enables the Proposed "batch" 
> process continuation throttling mechanism to work as hypothesized for both 
> "batch" and "streaming" execution.
>
> There's a lot to like, as it leans Beam further into the unification of Batch 
> and Stream, with one fewer exception (eg. unifies timer experience further). 
> It doesn't require a new primitive. It probably matches more with user 
> expectations anyway.
>
> It does cause looping timer execution with processing time to be a problem 
> for Drains however.
I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.

> I'd argue though that in the case of a drain, we could updated the semantics 
> as "move watermark to infinity"  "existing timers are executed, but new 
> timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.

> and ensure/and update the requirements around OnWindowExpiration callbacks to 
> be a bit more insistent on being implemented for correct execution, which is 
> currently the only "hard" signal to the SDK side that the window's work is 
> guaranteed to be over, and remaining state needs to be addressed by the 
> transform or be garbage collected. This remains critical for developing a 
> good pattern for ProcessingTime timers within a Global Window too.

+1

>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming should wait for
> > processing time timers, according to local time (with the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of each other (at least in
> > one direction, and likely the other). Both are an indication that I
> > can't act on something yet due to external constraints (e.g. not all
> > the data has been published, or I lack sufficient capacity/quota to
> > push things downstream) but I expect to be able to (or at least would
> > like to check again) at some time in the processing-time future. I
> > can't think of a batch or streaming scenario where it would be correct
> > to not wait at least that long (even in batch inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before they were fully written,
> > or waiting for some kind of (non-data-dependent) quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >
> > > For me it always helps to seek analogy in our physical reality. Stream
> > > processing actually has quite a good analogy for both event-time and
> > > processing-time - the simplest model for this being relativity theory.
> > > Event-time is the time at which events occur _at distant locations_. Due
> > > to finite and invariant speed of light (which is actually really
> > > involved in the explanation why any stream processing is inevitably
> > > unordered) these events are observed (processed) at different times
> > > (processing time, different for different observers). It is perfectly
> > > possible for an observer to observe events at a rate that is higher than
> > > one second per second. This also happens in reality for observers that
> > > travel at relativistic speeds (which might be an analogy for fast -
> > > batch - (re)processing). Besides the invariant speed, there is also
> > > another invariant - local clock (wall time) always ticks exactly at the
> > > rate of one second per second, no matter what. It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do not put any guarantees or
> > > bounds on the delay of firing processing time timers is purely technical
> > > - the processing is (per key) single-threaded, thus any timer has to
> > > wait before any element processing finishes. This is only consequence of
> > > a technical solution, not something fundamental.
> > >
> > > Having said that, my point is that according to the above analogy, it
> > > should be perfectly fine to fire processing time timers in batch based
> > > on (local wall) time only. There should be no way of manipulating this
> > > local time (excluding tests). Watermarks should be affected the same way
> > > as any buffering in a state that would happen in a stateful DoFn (i.e.
> > > set timer holds output watermark). We should probably pay attention to
> > > looping timers, but it seems possible to define a valid stopping
> > > condition (input watermark at infinity).
> > >
> > >   Jan
> > >
> > > On 2/22/24 19:50, Kenneth Knowles wrote:
> > > > Forking this thread.
> > > >
> > > > The state of processing time timers in this mode of processing is not
> > > > satisfactory and is discussed a lot but we should make everything
> > > > explicit.
> > > >
> > > > Currently, a state and timer DoFn has a number of logical watermarks:
> > > > (apologies for fixed width not coming through in email lists). Treat
> > > > timers as a back edge.
> > > >
> > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
> > > >             ^                      |
> > > > |--(B)-----------------|
> > > >                            timers
> > > >
> > > > (A) Input Element watermark: this is the watermark that promises there
> > > > is no incoming element with a timestamp earlier than it. Each input
> > > > element's timestamp holds this watermark. Note that *event time timers
> > > > firing is according to this watermark*. But a runner commits changes
> > > > to this watermark *whenever it wants*, in a way that can be
> > > > consistent. So the runner can absolute process *all* the elements
> > > > before advancing the watermark (A), and only afterwards start firing
> > > > timers.
> > > >
> > > > (B) Timer watermark: this is a watermark that promises no timer is set
> > > > with an output timestamp earlier than it. Each timer that has an
> > > > output timestamp holds this watermark. Note that timers can set new
> > > > timers, indefinitely, so this may never reach infinity even in a drain
> > > > scenario.
> > > >
> > > > (C) (derived) total input watermark: this is a watermark that is the
> > > > minimum of the two above, and ensures that all state for the DoFn for
> > > > expired windows can be GCd after calling @OnWindowExpiration.
> > > >
> > > > (D) output watermark: this is a promise that the DoFn will not output
> > > > earlier than the watermark. It is held by the total input watermark.
> > > >
> > > > So a any timer, processing or not, holds the total input watermark
> > > > which prevents window GC, hence the timer must be fired. You can set
> > > > timers without a timestamp and they will not hold (B) hence not hold
> > > > the total input / GC watermark (C). Then if a timer fires for an
> > > > expired window, it is ignored. But in general a timer that sets an
> > > > output timestamp is saying that it may produce output, so it *must* be
> > > > fired, even in batch, for data integrity. There was a time before
> > > > timers had output timestamps that we said that you *always* have to
> > > > have an @OnWindowExpiration callback for data integrity, and
> > > > processing time timers could not hold the watermark. That is changed 
> > > > now.
> > > >
> > > > One main purpose of processing time timers in streaming is to be a
> > > > "timeout" for data buffered in state, to eventually flush. In this
> > > > case the output timestamp should be the minimum of the elements in
> > > > state (or equivalent). In batch, of course, this kind of timer is not
> > > > relevant and we should definitely not wait for it, because the goal is
> > > > to just get through all the data. We can justify this by saying that
> > > > the worker really has no business having any idea what time it really
> > > > is, and the runner can just run the clock at whatever speed it wants.
> > > >
> > > > Another purpose, brought up on the Throttle thread, is to wait or
> > > > backoff. In this case it would be desired for the timer to actually
> > > > cause batch processing to pause and wait. This kind of behavior has
> > > > not been explored much. Notably the runner can absolutely process all
> > > > elements first, then start to fire any enqueued processing time
> > > > timers. In the same way that state in batch can just be in memory,
> > > > this *could* just be a call to sleep(). It all seems a bit sketchy so
> > > > I'd love clearer opinions.
> > > >
> > > > These two are both operational effects - as you would expect for
> > > > processing time timers - and they seem to be in conflict. Maybe they
> > > > just need different features?
> > > >
> > > > I'd love to hear some more uses of processing time timers from the
> > > > community.
> > > >
> > > > Kenn
> >

Reply via email to