I think that before we introduce a possibly somewhat duplicate new feature we should be certain that it is really semantically different. I'll rephrase the two cases:

 a) need to wait and block data (delay) - the use case is the motivating example of Throttle transform

 b) act without data, not block

Provided we align processing time with local machine clock (or better, because of testing, make current processing time available via context to @ProcessElement) it seems to possble to unify both cases under slightly updated semantics of processing time timer in batch:

 a) processing time timers fire with best-effort, i.e. trying to minimize delay between firing timestamp and timer's timestamp  b) timer is valid only in the context of current key-window, once watermark passes window GC time for the particular window that created the timer, it is ignored  c) if timer has output timestamp, this timestamp holds watermark (but this is currently probably noop, because runners currently do no propagate (per-key) watermark in batch, I assume)

In case b) there might be needed to distinguish cases when timer has output timestamp, if so, it probably should be taken into account.

Now, such semantics should be quite aligned with what we do in streaming case and what users generally expect. The blocking part can be implemented in @ProcessElement using buffer & timer, once there is need to wait, it can be implemented in user code using plain sleep(). That is due to the alignment between local time and definition of processing time. If we had some reason to be able to run faster-than-wall-clock (as I'm still not in favor of that), we could do that using ProcessContext.sleep(). Delaying processing in the @ProcessElement should result in backpressuring and backpropagation of this backpressure from the Throttle transform to the sources as mentioned (of course this is only for the streaming case).

Is there anything missing in such definition that would still require splitting the timers into two distinct features?

 Jan

On 2/26/24 21:22, Kenneth Knowles wrote:
Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even allowed to be set outside the window (or you'd end up with an element assigned to a window that it isn't within, since OutputTime essentially represents reserving the right to output an element with that timestamp)

Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke <rob...@frantil.com> wrote:

    Agreed that a retroactive behavior change would be bad, even if
    tied to a beam version change. I agree that it meshes well with
    the general theme of State & Timers exposing underlying primitives
    for implementing Windowing and similar. I'd say the distinction
    between the two might be additional complexity for users to grok,
    and would need to be documented well, as both operate in the
    ProcessingTime domain, but differently.

    What to call this new timer then? DelayTimer?

    "A DelayTimer sets an instant in ProcessingTime at which point
    computations can continue. Runners will prevent the EventTimer
    watermark from advancing past the set OutputTime until Processing
    Time has advanced to at least the provided instant to execute the
    timers callback. This can be used to allow the runner to constrain
    pipeline throughput with user guidance."

    I'd probably add that a timer with an output time outside of the
    window would not be guaranteed to fire, and that OnWindowExpiry is
    the correct way to ensure cleanup occurs.

    No solution to the Looping Timers on Drain problem here, but i
    think that's ultimately an orthogonal discussion, and will
    restrain my thoughts on that for now.

    This isn't a proposal, but exploring the solution space within our
    problem. We'd want to break down exactly what different and the
    same for the 3 kinds of timers...




    On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles <k...@apache.org>
    wrote:

        Pulling out focus points:

        On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
        <dev@beam.apache.org> wrote:
        > I can't act on something yet [...] but I expect to be able
        to [...] at some time in the processing-time future.

        I like this as a clear and internally-consistent feature
        description. It describes ProcessContinuation and those timers
        which serve the same purpose as ProcessContinuation.

        On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
        <dev@beam.apache.org> wrote:
        > I can't think of a batch or streaming scenario where it
        would be correct to not wait at least that long

        The main reason we created timers: to take action in the
        absence of data. The archetypal use case for processing time
        timers was/is "flush data from state if it has been sitting
        there too long". For this use case, the right behavior for
        batch is to skip the timer. It is actually basically incorrect
        to wait.

        On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
        <lostl...@apache.org> wrote:
        > It doesn't require a new primitive.

        IMO what's being proposed *is* a new primitive. I think it is
        a good primitive. It is the underlying primitive to
        ProcessContinuation. It would be user-friendly as a kind of
        timer. But if we made this the behavior of processing time
        timers retroactively, it would break everyone using them to
        flush data who is also reprocessing data.

        There's two very different use cases ("I need to wait, and
        block data" vs "I want to act without data, aka NOT wait for
        data") and I think we should serve both of them, but it
        doesn't have to be with the same low-level feature.

        Kenn


        On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
        <dev@beam.apache.org> wrote:

            On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
            <lostl...@apache.org> wrote:
            >
            > While I'm currently on the other side of the fence, I
            would not be against changing/requiring the semantics of
            ProcessingTime constructs to be "must wait and execute" as
            such a solution, and enables the Proposed "batch" process
            continuation throttling mechanism to work as hypothesized
            for both "batch" and "streaming" execution.
            >
            > There's a lot to like, as it leans Beam further into the
            unification of Batch and Stream, with one fewer exception
            (eg. unifies timer experience further). It doesn't require
            a new primitive. It probably matches more with user
            expectations anyway.
            >
            > It does cause looping timer execution with processing
            time to be a problem for Drains however.

            I think we have a problem with looping timers plus drain
            (a mostly
            streaming idea anyway) regardless.

            > I'd argue though that in the case of a drain, we could
            updated the semantics as "move watermark to infinity" 
            "existing timers are executed, but new timers are ignored",

            I don't like the idea of dropping timers for drain. I
            think correct
            handling here requires user visibility into whether a
            pipeline is
            draining or not.

            > and ensure/and update the requirements around
            OnWindowExpiration callbacks to be a bit more insistent on
            being implemented for correct execution, which is
            currently the only "hard" signal to the SDK side that the
            window's work is guaranteed to be over, and remaining
            state needs to be addressed by the transform or be garbage
            collected. This remains critical for developing a good
            pattern for ProcessingTime timers within a Global Window too.

            +1

            >
            > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
            > > Thanks for bringing this up.
            > >
            > > My position is that both batch and streaming should
            wait for
            > > processing time timers, according to local time (with
            the exception of
            > > tests that can accelerate this via faked clocks).
            > >
            > > Both ProcessContinuations delays and
            ProcessingTimeTimers are IMHO
            > > isomorphic, and can be implemented in terms of each
            other (at least in
            > > one direction, and likely the other). Both are an
            indication that I
            > > can't act on something yet due to external constraints
            (e.g. not all
            > > the data has been published, or I lack sufficient
            capacity/quota to
            > > push things downstream) but I expect to be able to (or
            at least would
            > > like to check again) at some time in the
            processing-time future. I
            > > can't think of a batch or streaming scenario where it
            would be correct
            > > to not wait at least that long (even in batch inputs,
            e.g. suppose I'm
            > > tailing logs and was eagerly started before they were
            fully written,
            > > or waiting for some kind of (non-data-dependent)
            quiessence or other
            > > operation to finish).
            > >
            > >
            > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský
            <je...@seznam.cz> wrote:
            > > >
            > > > For me it always helps to seek analogy in our
            physical reality. Stream
            > > > processing actually has quite a good analogy for
            both event-time and
            > > > processing-time - the simplest model for this being
            relativity theory.
            > > > Event-time is the time at which events occur _at
            distant locations_. Due
            > > > to finite and invariant speed of light (which is
            actually really
            > > > involved in the explanation why any stream
            processing is inevitably
            > > > unordered) these events are observed (processed) at
            different times
            > > > (processing time, different for different
            observers). It is perfectly
            > > > possible for an observer to observe events at a rate
            that is higher than
            > > > one second per second. This also happens in reality
            for observers that
            > > > travel at relativistic speeds (which might be an
            analogy for fast -
            > > > batch - (re)processing). Besides the invariant
            speed, there is also
            > > > another invariant - local clock (wall time) always
            ticks exactly at the
            > > > rate of one second per second, no matter what. It is
            not possible to
            > > > "move faster or slower" through (local) time.
            > > >
            > > > In my understanding the reason why we do not put any
            guarantees or
            > > > bounds on the delay of firing processing time timers
            is purely technical
            > > > - the processing is (per key) single-threaded, thus
            any timer has to
            > > > wait before any element processing finishes. This is
            only consequence of
            > > > a technical solution, not something fundamental.
            > > >
            > > > Having said that, my point is that according to the
            above analogy, it
            > > > should be perfectly fine to fire processing time
            timers in batch based
            > > > on (local wall) time only. There should be no way of
            manipulating this
            > > > local time (excluding tests). Watermarks should be
            affected the same way
            > > > as any buffering in a state that would happen in a
            stateful DoFn (i.e.
            > > > set timer holds output watermark). We should
            probably pay attention to
            > > > looping timers, but it seems possible to define a
            valid stopping
            > > > condition (input watermark at infinity).
            > > >
            > > >   Jan
            > > >
            > > > On 2/22/24 19:50, Kenneth Knowles wrote:
            > > > > Forking this thread.
            > > > >
            > > > > The state of processing time timers in this mode
            of processing is not
            > > > > satisfactory and is discussed a lot but we should
            make everything
            > > > > explicit.
            > > > >
            > > > > Currently, a state and timer DoFn has a number of
            logical watermarks:
            > > > > (apologies for fixed width not coming through in
            email lists). Treat
            > > > > timers as a back edge.
            > > > >
            > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
            > > > >             ^       |
            > > > > |--(B)-----------------|
            > > > > timers
            > > > >
            > > > > (A) Input Element watermark: this is the watermark
            that promises there
            > > > > is no incoming element with a timestamp earlier
            than it. Each input
            > > > > element's timestamp holds this watermark. Note
            that *event time timers
            > > > > firing is according to this watermark*. But a
            runner commits changes
            > > > > to this watermark *whenever it wants*, in a way
            that can be
            > > > > consistent. So the runner can absolute process
            *all* the elements
            > > > > before advancing the watermark (A), and only
            afterwards start firing
            > > > > timers.
            > > > >
            > > > > (B) Timer watermark: this is a watermark that
            promises no timer is set
            > > > > with an output timestamp earlier than it. Each
            timer that has an
            > > > > output timestamp holds this watermark. Note that
            timers can set new
            > > > > timers, indefinitely, so this may never reach
            infinity even in a drain
            > > > > scenario.
            > > > >
            > > > > (C) (derived) total input watermark: this is a
            watermark that is the
            > > > > minimum of the two above, and ensures that all
            state for the DoFn for
            > > > > expired windows can be GCd after calling
            @OnWindowExpiration.
            > > > >
            > > > > (D) output watermark: this is a promise that the
            DoFn will not output
            > > > > earlier than the watermark. It is held by the
            total input watermark.
            > > > >
            > > > > So a any timer, processing or not, holds the total
            input watermark
            > > > > which prevents window GC, hence the timer must be
            fired. You can set
            > > > > timers without a timestamp and they will not hold
            (B) hence not hold
            > > > > the total input / GC watermark (C). Then if a
            timer fires for an
            > > > > expired window, it is ignored. But in general a
            timer that sets an
            > > > > output timestamp is saying that it may produce
            output, so it *must* be
            > > > > fired, even in batch, for data integrity. There
            was a time before
            > > > > timers had output timestamps that we said that you
            *always* have to
            > > > > have an @OnWindowExpiration callback for data
            integrity, and
            > > > > processing time timers could not hold the
            watermark. That is changed now.
            > > > >
            > > > > One main purpose of processing time timers in
            streaming is to be a
            > > > > "timeout" for data buffered in state, to
            eventually flush. In this
            > > > > case the output timestamp should be the minimum of
            the elements in
            > > > > state (or equivalent). In batch, of course, this
            kind of timer is not
            > > > > relevant and we should definitely not wait for it,
            because the goal is
            > > > > to just get through all the data. We can justify
            this by saying that
            > > > > the worker really has no business having any idea
            what time it really
            > > > > is, and the runner can just run the clock at
            whatever speed it wants.
            > > > >
            > > > > Another purpose, brought up on the Throttle
            thread, is to wait or
            > > > > backoff. In this case it would be desired for the
            timer to actually
            > > > > cause batch processing to pause and wait. This
            kind of behavior has
            > > > > not been explored much. Notably the runner can
            absolutely process all
            > > > > elements first, then start to fire any enqueued
            processing time
            > > > > timers. In the same way that state in batch can
            just be in memory,
            > > > > this *could* just be a call to sleep(). It all
            seems a bit sketchy so
            > > > > I'd love clearer opinions.
            > > > >
            > > > > These two are both operational effects - as you
            would expect for
            > > > > processing time timers - and they seem to be in
            conflict. Maybe they
            > > > > just need different features?
            > > > >
            > > > > I'd love to hear some more uses of processing time
            timers from the
            > > > > community.
            > > > >
            > > > > Kenn
            > >

Reply via email to