On Tue, Feb 27, 2024 at 10:22 AM Robert Bradshaw via dev < dev@beam.apache.org> wrote:
> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote: > > > > Pulling out focus points: > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > > > I can't act on something yet [...] but I expect to be able to [...] at > some time in the processing-time future. > > > > I like this as a clear and internally-consistent feature description. It > describes ProcessContinuation and those timers which serve the same purpose > as ProcessContinuation. > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > > > I can't think of a batch or streaming scenario where it would be > correct to not wait at least that long > > > > The main reason we created timers: to take action in the absence of > data. The archetypal use case for processing time timers was/is "flush data > from state if it has been sitting there too long". For this use case, the > right behavior for batch is to skip the timer. It is actually basically > incorrect to wait. > > Good point calling out the distinction between "I need to wait in case > there's more data." and "I need to wait for something external." We > can't currently distinguish between the two, but a batch runner can > say something definitive about the first. Feels like we need a new > primitive (or at least new signaling information on our existing > primitive). > > BTW the first is also relevant to drain. One reason drain often takes a long time today is because it has to wait for processing-time timers to fire (it has to wait because those timers have watermark holds), but usually those timers are noops. > > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> > wrote: > > > It doesn't require a new primitive. > > > > IMO what's being proposed *is* a new primitive. I think it is a good > primitive. It is the underlying primitive to ProcessContinuation. It would > be user-friendly as a kind of timer. But if we made this the behavior of > processing time timers retroactively, it would break everyone using them to > flush data who is also reprocessing data. > > > > There's two very different use cases ("I need to wait, and block data" > vs "I want to act without data, aka NOT wait for data") and I think we > should serve both of them, but it doesn't have to be with the same > low-level feature. > > > > Kenn > > > > > > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> > >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> > wrote: > >> > > >> > While I'm currently on the other side of the fence, I would not be > against changing/requiring the semantics of ProcessingTime constructs to be > "must wait and execute" as such a solution, and enables the Proposed > "batch" process continuation throttling mechanism to work as hypothesized > for both "batch" and "streaming" execution. > >> > > >> > There's a lot to like, as it leans Beam further into the unification > of Batch and Stream, with one fewer exception (eg. unifies timer experience > further). It doesn't require a new primitive. It probably matches more with > user expectations anyway. > >> > > >> > It does cause looping timer execution with processing time to be a > problem for Drains however. > >> > >> I think we have a problem with looping timers plus drain (a mostly > >> streaming idea anyway) regardless. > >> > >> > I'd argue though that in the case of a drain, we could updated the > semantics as "move watermark to infinity" "existing timers are executed, > but new timers are ignored", > >> > >> I don't like the idea of dropping timers for drain. I think correct > >> handling here requires user visibility into whether a pipeline is > >> draining or not. > >> > >> > and ensure/and update the requirements around OnWindowExpiration > callbacks to be a bit more insistent on being implemented for correct > execution, which is currently the only "hard" signal to the SDK side that > the window's work is guaranteed to be over, and remaining state needs to be > addressed by the transform or be garbage collected. This remains critical > for developing a good pattern for ProcessingTime timers within a Global > Window too. > >> > >> +1 > >> > >> > > >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote: > >> > > Thanks for bringing this up. > >> > > > >> > > My position is that both batch and streaming should wait for > >> > > processing time timers, according to local time (with the exception > of > >> > > tests that can accelerate this via faked clocks). > >> > > > >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO > >> > > isomorphic, and can be implemented in terms of each other (at least > in > >> > > one direction, and likely the other). Both are an indication that I > >> > > can't act on something yet due to external constraints (e.g. not all > >> > > the data has been published, or I lack sufficient capacity/quota to > >> > > push things downstream) but I expect to be able to (or at least > would > >> > > like to check again) at some time in the processing-time future. I > >> > > can't think of a batch or streaming scenario where it would be > correct > >> > > to not wait at least that long (even in batch inputs, e.g. suppose > I'm > >> > > tailing logs and was eagerly started before they were fully written, > >> > > or waiting for some kind of (non-data-dependent) quiessence or other > >> > > operation to finish). > >> > > > >> > > > >> > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz> > wrote: > >> > > > > >> > > > For me it always helps to seek analogy in our physical reality. > Stream > >> > > > processing actually has quite a good analogy for both event-time > and > >> > > > processing-time - the simplest model for this being relativity > theory. > >> > > > Event-time is the time at which events occur _at distant > locations_. Due > >> > > > to finite and invariant speed of light (which is actually really > >> > > > involved in the explanation why any stream processing is > inevitably > >> > > > unordered) these events are observed (processed) at different > times > >> > > > (processing time, different for different observers). It is > perfectly > >> > > > possible for an observer to observe events at a rate that is > higher than > >> > > > one second per second. This also happens in reality for observers > that > >> > > > travel at relativistic speeds (which might be an analogy for fast > - > >> > > > batch - (re)processing). Besides the invariant speed, there is > also > >> > > > another invariant - local clock (wall time) always ticks exactly > at the > >> > > > rate of one second per second, no matter what. It is not possible > to > >> > > > "move faster or slower" through (local) time. > >> > > > > >> > > > In my understanding the reason why we do not put any guarantees or > >> > > > bounds on the delay of firing processing time timers is purely > technical > >> > > > - the processing is (per key) single-threaded, thus any timer has > to > >> > > > wait before any element processing finishes. This is only > consequence of > >> > > > a technical solution, not something fundamental. > >> > > > > >> > > > Having said that, my point is that according to the above > analogy, it > >> > > > should be perfectly fine to fire processing time timers in batch > based > >> > > > on (local wall) time only. There should be no way of manipulating > this > >> > > > local time (excluding tests). Watermarks should be affected the > same way > >> > > > as any buffering in a state that would happen in a stateful DoFn > (i.e. > >> > > > set timer holds output watermark). We should probably pay > attention to > >> > > > looping timers, but it seems possible to define a valid stopping > >> > > > condition (input watermark at infinity). > >> > > > > >> > > > Jan > >> > > > > >> > > > On 2/22/24 19:50, Kenneth Knowles wrote: > >> > > > > Forking this thread. > >> > > > > > >> > > > > The state of processing time timers in this mode of processing > is not > >> > > > > satisfactory and is discussed a lot but we should make > everything > >> > > > > explicit. > >> > > > > > >> > > > > Currently, a state and timer DoFn has a number of logical > watermarks: > >> > > > > (apologies for fixed width not coming through in email lists). > Treat > >> > > > > timers as a back edge. > >> > > > > > >> > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output > >> > > > > ^ | > >> > > > > |--(B)-----------------| > >> > > > > timers > >> > > > > > >> > > > > (A) Input Element watermark: this is the watermark that > promises there > >> > > > > is no incoming element with a timestamp earlier than it. Each > input > >> > > > > element's timestamp holds this watermark. Note that *event time > timers > >> > > > > firing is according to this watermark*. But a runner commits > changes > >> > > > > to this watermark *whenever it wants*, in a way that can be > >> > > > > consistent. So the runner can absolute process *all* the > elements > >> > > > > before advancing the watermark (A), and only afterwards start > firing > >> > > > > timers. > >> > > > > > >> > > > > (B) Timer watermark: this is a watermark that promises no timer > is set > >> > > > > with an output timestamp earlier than it. Each timer that has an > >> > > > > output timestamp holds this watermark. Note that timers can set > new > >> > > > > timers, indefinitely, so this may never reach infinity even in > a drain > >> > > > > scenario. > >> > > > > > >> > > > > (C) (derived) total input watermark: this is a watermark that > is the > >> > > > > minimum of the two above, and ensures that all state for the > DoFn for > >> > > > > expired windows can be GCd after calling @OnWindowExpiration. > >> > > > > > >> > > > > (D) output watermark: this is a promise that the DoFn will not > output > >> > > > > earlier than the watermark. It is held by the total input > watermark. > >> > > > > > >> > > > > So a any timer, processing or not, holds the total input > watermark > >> > > > > which prevents window GC, hence the timer must be fired. You > can set > >> > > > > timers without a timestamp and they will not hold (B) hence not > hold > >> > > > > the total input / GC watermark (C). Then if a timer fires for an > >> > > > > expired window, it is ignored. But in general a timer that sets > an > >> > > > > output timestamp is saying that it may produce output, so it > *must* be > >> > > > > fired, even in batch, for data integrity. There was a time > before > >> > > > > timers had output timestamps that we said that you *always* > have to > >> > > > > have an @OnWindowExpiration callback for data integrity, and > >> > > > > processing time timers could not hold the watermark. That is > changed now. > >> > > > > > >> > > > > One main purpose of processing time timers in streaming is to > be a > >> > > > > "timeout" for data buffered in state, to eventually flush. In > this > >> > > > > case the output timestamp should be the minimum of the elements > in > >> > > > > state (or equivalent). In batch, of course, this kind of timer > is not > >> > > > > relevant and we should definitely not wait for it, because the > goal is > >> > > > > to just get through all the data. We can justify this by saying > that > >> > > > > the worker really has no business having any idea what time it > really > >> > > > > is, and the runner can just run the clock at whatever speed it > wants. > >> > > > > > >> > > > > Another purpose, brought up on the Throttle thread, is to wait > or > >> > > > > backoff. In this case it would be desired for the timer to > actually > >> > > > > cause batch processing to pause and wait. This kind of behavior > has > >> > > > > not been explored much. Notably the runner can absolutely > process all > >> > > > > elements first, then start to fire any enqueued processing time > >> > > > > timers. In the same way that state in batch can just be in > memory, > >> > > > > this *could* just be a call to sleep(). It all seems a bit > sketchy so > >> > > > > I'd love clearer opinions. > >> > > > > > >> > > > > These two are both operational effects - as you would expect for > >> > > > > processing time timers - and they seem to be in conflict. Maybe > they > >> > > > > just need different features? > >> > > > > > >> > > > > I'd love to hear some more uses of processing time timers from > the > >> > > > > community. > >> > > > > > >> > > > > Kenn > >> > > >