On Tue, Feb 27, 2024 at 10:22 AM Robert Bradshaw via dev <
dev@beam.apache.org> wrote:
> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
> >
> > Pulling out focus points:
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't act on something yet [...] but I expect to be able to [...] at
> some time in the processing-time future.
> >
> > I like this as a clear and internally-consistent feature description. It
> describes ProcessContinuation and those timers which serve the same purpose
> as ProcessContinuation.
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't think of a batch or streaming scenario where it would be
> correct to not wait at least that long
> >
> > The main reason we created timers: to take action in the absence of
> data. The archetypal use case for processing time timers was/is "flush data
> from state if it has been sitting there too long". For this use case, the
> right behavior for batch is to skip the timer. It is actually basically
> incorrect to wait.
>
> Good point calling out the distinction between "I need to wait in case
> there's more data." and "I need to wait for something external." We
> can't currently distinguish between the two, but a batch runner can
> say something definitive about the first. Feels like we need a new
> primitive (or at least new signaling information on our existing
> primitive).
>
> BTW the first is also relevant to drain. One reason drain often takes a
long time today is because it has to wait for processing-time timers to
fire (it has to wait because those timers have watermark holds), but
usually those timers are noops.


> > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org>
> wrote:
> > > It doesn't require a new primitive.
> >
> > IMO what's being proposed *is* a new primitive. I think it is a good
> primitive. It is the underlying primitive to ProcessContinuation. It would
> be user-friendly as a kind of timer. But if we made this the behavior of
> processing time timers retroactively, it would break everyone using them to
> flush data who is also reprocessing data.
> >
> > There's two very different use cases ("I need to wait, and block data"
> vs "I want to act without data, aka NOT wait for data") and I think we
> should serve both of them, but it doesn't have to be with the same
> low-level feature.
> >
> > Kenn
> >
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org>
> wrote:
> >> >
> >> > While I'm currently on the other side of the fence, I would not be
> against changing/requiring the semantics of ProcessingTime constructs to be
> "must wait and execute" as such a solution, and enables the Proposed
> "batch" process continuation throttling mechanism to work as hypothesized
> for both "batch" and "streaming" execution.
> >> >
> >> > There's a lot to like, as it leans Beam further into the unification
> of Batch and Stream, with one fewer exception (eg. unifies timer experience
> further). It doesn't require a new primitive. It probably matches more with
> user expectations anyway.
> >> >
> >> > It does cause looping timer execution with processing time to be a
> problem for Drains however.
> >>
> >> I think we have a problem with looping timers plus drain (a mostly
> >> streaming idea anyway) regardless.
> >>
> >> > I'd argue though that in the case of a drain, we could updated the
> semantics as "move watermark to infinity"  "existing timers are executed,
> but new timers are ignored",
> >>
> >> I don't like the idea of dropping timers for drain. I think correct
> >> handling here requires user visibility into whether a pipeline is
> >> draining or not.
> >>
> >> > and ensure/and update the requirements around OnWindowExpiration
> callbacks to be a bit more insistent on being implemented for correct
> execution, which is currently the only "hard" signal to the SDK side that
> the window's work is guaranteed to be over, and remaining state needs to be
> addressed by the transform or be garbage collected. This remains critical
> for developing a good pattern for ProcessingTime timers within a Global
> Window too.
> >>
> >> +1
> >>
> >> >
> >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> >> > > Thanks for bringing this up.
> >> > >
> >> > > My position is that both batch and streaming should wait for
> >> > > processing time timers, according to local time (with the exception
> of
> >> > > tests that can accelerate this via faked clocks).
> >> > >
> >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> >> > > isomorphic, and can be implemented in terms of each other (at least
> in
> >> > > one direction, and likely the other). Both are an indication that I
> >> > > can't act on something yet due to external constraints (e.g. not all
> >> > > the data has been published, or I lack sufficient capacity/quota to
> >> > > push things downstream) but I expect to be able to (or at least
> would
> >> > > like to check again) at some time in the processing-time future. I
> >> > > can't think of a batch or streaming scenario where it would be
> correct
> >> > > to not wait at least that long (even in batch inputs, e.g. suppose
> I'm
> >> > > tailing logs and was eagerly started before they were fully written,
> >> > > or waiting for some kind of (non-data-dependent) quiessence or other
> >> > > operation to finish).
> >> > >
> >> > >
> >> > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz>
> wrote:
> >> > > >
> >> > > > For me it always helps to seek analogy in our physical reality.
> Stream
> >> > > > processing actually has quite a good analogy for both event-time
> and
> >> > > > processing-time - the simplest model for this being relativity
> theory.
> >> > > > Event-time is the time at which events occur _at distant
> locations_. Due
> >> > > > to finite and invariant speed of light (which is actually really
> >> > > > involved in the explanation why any stream processing is
> inevitably
> >> > > > unordered) these events are observed (processed) at different
> times
> >> > > > (processing time, different for different observers). It is
> perfectly
> >> > > > possible for an observer to observe events at a rate that is
> higher than
> >> > > > one second per second. This also happens in reality for observers
> that
> >> > > > travel at relativistic speeds (which might be an analogy for fast
> -
> >> > > > batch - (re)processing). Besides the invariant speed, there is
> also
> >> > > > another invariant - local clock (wall time) always ticks exactly
> at the
> >> > > > rate of one second per second, no matter what. It is not possible
> to
> >> > > > "move faster or slower" through (local) time.
> >> > > >
> >> > > > In my understanding the reason why we do not put any guarantees or
> >> > > > bounds on the delay of firing processing time timers is purely
> technical
> >> > > > - the processing is (per key) single-threaded, thus any timer has
> to
> >> > > > wait before any element processing finishes. This is only
> consequence of
> >> > > > a technical solution, not something fundamental.
> >> > > >
> >> > > > Having said that, my point is that according to the above
> analogy, it
> >> > > > should be perfectly fine to fire processing time timers in batch
> based
> >> > > > on (local wall) time only. There should be no way of manipulating
> this
> >> > > > local time (excluding tests). Watermarks should be affected the
> same way
> >> > > > as any buffering in a state that would happen in a stateful DoFn
> (i.e.
> >> > > > set timer holds output watermark). We should probably pay
> attention to
> >> > > > looping timers, but it seems possible to define a valid stopping
> >> > > > condition (input watermark at infinity).
> >> > > >
> >> > > >   Jan
> >> > > >
> >> > > > On 2/22/24 19:50, Kenneth Knowles wrote:
> >> > > > > Forking this thread.
> >> > > > >
> >> > > > > The state of processing time timers in this mode of processing
> is not
> >> > > > > satisfactory and is discussed a lot but we should make
> everything
> >> > > > > explicit.
> >> > > > >
> >> > > > > Currently, a state and timer DoFn has a number of logical
> watermarks:
> >> > > > > (apologies for fixed width not coming through in email lists).
> Treat
> >> > > > > timers as a back edge.
> >> > > > >
> >> > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
> >> > > > >             ^                      |
> >> > > > > |--(B)-----------------|
> >> > > > >                            timers
> >> > > > >
> >> > > > > (A) Input Element watermark: this is the watermark that
> promises there
> >> > > > > is no incoming element with a timestamp earlier than it. Each
> input
> >> > > > > element's timestamp holds this watermark. Note that *event time
> timers
> >> > > > > firing is according to this watermark*. But a runner commits
> changes
> >> > > > > to this watermark *whenever it wants*, in a way that can be
> >> > > > > consistent. So the runner can absolute process *all* the
> elements
> >> > > > > before advancing the watermark (A), and only afterwards start
> firing
> >> > > > > timers.
> >> > > > >
> >> > > > > (B) Timer watermark: this is a watermark that promises no timer
> is set
> >> > > > > with an output timestamp earlier than it. Each timer that has an
> >> > > > > output timestamp holds this watermark. Note that timers can set
> new
> >> > > > > timers, indefinitely, so this may never reach infinity even in
> a drain
> >> > > > > scenario.
> >> > > > >
> >> > > > > (C) (derived) total input watermark: this is a watermark that
> is the
> >> > > > > minimum of the two above, and ensures that all state for the
> DoFn for
> >> > > > > expired windows can be GCd after calling @OnWindowExpiration.
> >> > > > >
> >> > > > > (D) output watermark: this is a promise that the DoFn will not
> output
> >> > > > > earlier than the watermark. It is held by the total input
> watermark.
> >> > > > >
> >> > > > > So a any timer, processing or not, holds the total input
> watermark
> >> > > > > which prevents window GC, hence the timer must be fired. You
> can set
> >> > > > > timers without a timestamp and they will not hold (B) hence not
> hold
> >> > > > > the total input / GC watermark (C). Then if a timer fires for an
> >> > > > > expired window, it is ignored. But in general a timer that sets
> an
> >> > > > > output timestamp is saying that it may produce output, so it
> *must* be
> >> > > > > fired, even in batch, for data integrity. There was a time
> before
> >> > > > > timers had output timestamps that we said that you *always*
> have to
> >> > > > > have an @OnWindowExpiration callback for data integrity, and
> >> > > > > processing time timers could not hold the watermark. That is
> changed now.
> >> > > > >
> >> > > > > One main purpose of processing time timers in streaming is to
> be a
> >> > > > > "timeout" for data buffered in state, to eventually flush. In
> this
> >> > > > > case the output timestamp should be the minimum of the elements
> in
> >> > > > > state (or equivalent). In batch, of course, this kind of timer
> is not
> >> > > > > relevant and we should definitely not wait for it, because the
> goal is
> >> > > > > to just get through all the data. We can justify this by saying
> that
> >> > > > > the worker really has no business having any idea what time it
> really
> >> > > > > is, and the runner can just run the clock at whatever speed it
> wants.
> >> > > > >
> >> > > > > Another purpose, brought up on the Throttle thread, is to wait
> or
> >> > > > > backoff. In this case it would be desired for the timer to
> actually
> >> > > > > cause batch processing to pause and wait. This kind of behavior
> has
> >> > > > > not been explored much. Notably the runner can absolutely
> process all
> >> > > > > elements first, then start to fire any enqueued processing time
> >> > > > > timers. In the same way that state in batch can just be in
> memory,
> >> > > > > this *could* just be a call to sleep(). It all seems a bit
> sketchy so
> >> > > > > I'd love clearer opinions.
> >> > > > >
> >> > > > > These two are both operational effects - as you would expect for
> >> > > > > processing time timers - and they seem to be in conflict. Maybe
> they
> >> > > > > just need different features?
> >> > > > >
> >> > > > > I'd love to hear some more uses of processing time timers from
> the
> >> > > > > community.
> >> > > > >
> >> > > > > Kenn
> >> > >
>

Reply via email to