On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote: > > While I'm currently on the other side of the fence, I would not be against > changing/requiring the semantics of ProcessingTime constructs to be "must > wait and execute" as such a solution, and enables the Proposed "batch" > process continuation throttling mechanism to work as hypothesized for both > "batch" and "streaming" execution. > > There's a lot to like, as it leans Beam further into the unification of Batch > and Stream, with one fewer exception (eg. unifies timer experience further). > It doesn't require a new primitive. It probably matches more with user > expectations anyway. > > It does cause looping timer execution with processing time to be a problem > for Drains however.
I think we have a problem with looping timers plus drain (a mostly streaming idea anyway) regardless. > I'd argue though that in the case of a drain, we could updated the semantics > as "move watermark to infinity" "existing timers are executed, but new > timers are ignored", I don't like the idea of dropping timers for drain. I think correct handling here requires user visibility into whether a pipeline is draining or not. > and ensure/and update the requirements around OnWindowExpiration callbacks to > be a bit more insistent on being implemented for correct execution, which is > currently the only "hard" signal to the SDK side that the window's work is > guaranteed to be over, and remaining state needs to be addressed by the > transform or be garbage collected. This remains critical for developing a > good pattern for ProcessingTime timers within a Global Window too. +1 > > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote: > > Thanks for bringing this up. > > > > My position is that both batch and streaming should wait for > > processing time timers, according to local time (with the exception of > > tests that can accelerate this via faked clocks). > > > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO > > isomorphic, and can be implemented in terms of each other (at least in > > one direction, and likely the other). Both are an indication that I > > can't act on something yet due to external constraints (e.g. not all > > the data has been published, or I lack sufficient capacity/quota to > > push things downstream) but I expect to be able to (or at least would > > like to check again) at some time in the processing-time future. I > > can't think of a batch or streaming scenario where it would be correct > > to not wait at least that long (even in batch inputs, e.g. suppose I'm > > tailing logs and was eagerly started before they were fully written, > > or waiting for some kind of (non-data-dependent) quiessence or other > > operation to finish). > > > > > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz> wrote: > > > > > > For me it always helps to seek analogy in our physical reality. Stream > > > processing actually has quite a good analogy for both event-time and > > > processing-time - the simplest model for this being relativity theory. > > > Event-time is the time at which events occur _at distant locations_. Due > > > to finite and invariant speed of light (which is actually really > > > involved in the explanation why any stream processing is inevitably > > > unordered) these events are observed (processed) at different times > > > (processing time, different for different observers). It is perfectly > > > possible for an observer to observe events at a rate that is higher than > > > one second per second. This also happens in reality for observers that > > > travel at relativistic speeds (which might be an analogy for fast - > > > batch - (re)processing). Besides the invariant speed, there is also > > > another invariant - local clock (wall time) always ticks exactly at the > > > rate of one second per second, no matter what. It is not possible to > > > "move faster or slower" through (local) time. > > > > > > In my understanding the reason why we do not put any guarantees or > > > bounds on the delay of firing processing time timers is purely technical > > > - the processing is (per key) single-threaded, thus any timer has to > > > wait before any element processing finishes. This is only consequence of > > > a technical solution, not something fundamental. > > > > > > Having said that, my point is that according to the above analogy, it > > > should be perfectly fine to fire processing time timers in batch based > > > on (local wall) time only. There should be no way of manipulating this > > > local time (excluding tests). Watermarks should be affected the same way > > > as any buffering in a state that would happen in a stateful DoFn (i.e. > > > set timer holds output watermark). We should probably pay attention to > > > looping timers, but it seems possible to define a valid stopping > > > condition (input watermark at infinity). > > > > > > Jan > > > > > > On 2/22/24 19:50, Kenneth Knowles wrote: > > > > Forking this thread. > > > > > > > > The state of processing time timers in this mode of processing is not > > > > satisfactory and is discussed a lot but we should make everything > > > > explicit. > > > > > > > > Currently, a state and timer DoFn has a number of logical watermarks: > > > > (apologies for fixed width not coming through in email lists). Treat > > > > timers as a back edge. > > > > > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output > > > > ^ | > > > > |--(B)-----------------| > > > > timers > > > > > > > > (A) Input Element watermark: this is the watermark that promises there > > > > is no incoming element with a timestamp earlier than it. Each input > > > > element's timestamp holds this watermark. Note that *event time timers > > > > firing is according to this watermark*. But a runner commits changes > > > > to this watermark *whenever it wants*, in a way that can be > > > > consistent. So the runner can absolute process *all* the elements > > > > before advancing the watermark (A), and only afterwards start firing > > > > timers. > > > > > > > > (B) Timer watermark: this is a watermark that promises no timer is set > > > > with an output timestamp earlier than it. Each timer that has an > > > > output timestamp holds this watermark. Note that timers can set new > > > > timers, indefinitely, so this may never reach infinity even in a drain > > > > scenario. > > > > > > > > (C) (derived) total input watermark: this is a watermark that is the > > > > minimum of the two above, and ensures that all state for the DoFn for > > > > expired windows can be GCd after calling @OnWindowExpiration. > > > > > > > > (D) output watermark: this is a promise that the DoFn will not output > > > > earlier than the watermark. It is held by the total input watermark. > > > > > > > > So a any timer, processing or not, holds the total input watermark > > > > which prevents window GC, hence the timer must be fired. You can set > > > > timers without a timestamp and they will not hold (B) hence not hold > > > > the total input / GC watermark (C). Then if a timer fires for an > > > > expired window, it is ignored. But in general a timer that sets an > > > > output timestamp is saying that it may produce output, so it *must* be > > > > fired, even in batch, for data integrity. There was a time before > > > > timers had output timestamps that we said that you *always* have to > > > > have an @OnWindowExpiration callback for data integrity, and > > > > processing time timers could not hold the watermark. That is changed > > > > now. > > > > > > > > One main purpose of processing time timers in streaming is to be a > > > > "timeout" for data buffered in state, to eventually flush. In this > > > > case the output timestamp should be the minimum of the elements in > > > > state (or equivalent). In batch, of course, this kind of timer is not > > > > relevant and we should definitely not wait for it, because the goal is > > > > to just get through all the data. We can justify this by saying that > > > > the worker really has no business having any idea what time it really > > > > is, and the runner can just run the clock at whatever speed it wants. > > > > > > > > Another purpose, brought up on the Throttle thread, is to wait or > > > > backoff. In this case it would be desired for the timer to actually > > > > cause batch processing to pause and wait. This kind of behavior has > > > > not been explored much. Notably the runner can absolutely process all > > > > elements first, then start to fire any enqueued processing time > > > > timers. In the same way that state in batch can just be in memory, > > > > this *could* just be a call to sleep(). It all seems a bit sketchy so > > > > I'd love clearer opinions. > > > > > > > > These two are both operational effects - as you would expect for > > > > processing time timers - and they seem to be in conflict. Maybe they > > > > just need different features? > > > > > > > > I'd love to hear some more uses of processing time timers from the > > > > community. > > > > > > > > Kenn > >