For me it always helps to seek analogy in our physical reality. Stream processing actually has quite a good analogy for both event-time and processing-time - the simplest model for this being relativity theory. Event-time is the time at which events occur _at distant locations_. Due to finite and invariant speed of light (which is actually really involved in the explanation why any stream processing is inevitably unordered) these events are observed (processed) at different times (processing time, different for different observers). It is perfectly possible for an observer to observe events at a rate that is higher than one second per second. This also happens in reality for observers that travel at relativistic speeds (which might be an analogy for fast - batch - (re)processing). Besides the invariant speed, there is also another invariant - local clock (wall time) always ticks exactly at the rate of one second per second, no matter what. It is not possible to "move faster or slower" through (local) time.
In my understanding the reason why we do not put any guarantees or 
bounds on the delay of firing processing time timers is purely technical 
- the processing is (per key) single-threaded, thus any timer has to 
wait before any element processing finishes. This is only consequence of 
a technical solution, not something fundamental.
Having said that, my point is that according to the above analogy, it 
should be perfectly fine to fire processing time timers in batch based 
on (local wall) time only. There should be no way of manipulating this 
local time (excluding tests). Watermarks should be affected the same way 
as any buffering in a state that would happen in a stateful DoFn (i.e. 
set timer holds output watermark). We should probably pay attention to 
looping timers, but it seems possible to define a valid stopping 
condition (input watermark at infinity).
 Jan

On 2/22/24 19:50, Kenneth Knowles wrote:
Forking this thread.

The state of processing time timers in this mode of processing is not satisfactory and is discussed a lot but we should make everything explicit.
Currently, a state and timer DoFn has a number of logical watermarks: 
(apologies for fixed width not coming through in email lists). Treat 
timers as a back edge.
input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
            ^                      |
|--(B)-----------------|
                           timers

(A) Input Element watermark: this is the watermark that promises there is no incoming element with a timestamp earlier than it. Each input element's timestamp holds this watermark. Note that *event time timers firing is according to this watermark*. But a runner commits changes to this watermark *whenever it wants*, in a way that can be consistent. So the runner can absolute process *all* the elements before advancing the watermark (A), and only afterwards start firing timers.
(B) Timer watermark: this is a watermark that promises no timer is set 
with an output timestamp earlier than it. Each timer that has an 
output timestamp holds this watermark. Note that timers can set new 
timers, indefinitely, so this may never reach infinity even in a drain 
scenario.
(C) (derived) total input watermark: this is a watermark that is the 
minimum of the two above, and ensures that all state for the DoFn for 
expired windows can be GCd after calling @OnWindowExpiration.
(D) output watermark: this is a promise that the DoFn will not output 
earlier than the watermark. It is held by the total input watermark.
So a any timer, processing or not, holds the total input watermark 
which prevents window GC, hence the timer must be fired. You can set 
timers without a timestamp and they will not hold (B) hence not hold 
the total input / GC watermark (C). Then if a timer fires for an 
expired window, it is ignored. But in general a timer that sets an 
output timestamp is saying that it may produce output, so it *must* be 
fired, even in batch, for data integrity. There was a time before 
timers had output timestamps that we said that you *always* have to 
have an @OnWindowExpiration callback for data integrity, and 
processing time timers could not hold the watermark. That is changed now.
One main purpose of processing time timers in streaming is to be a 
"timeout" for data buffered in state, to eventually flush. In this 
case the output timestamp should be the minimum of the elements in 
state (or equivalent). In batch, of course, this kind of timer is not 
relevant and we should definitely not wait for it, because the goal is 
to just get through all the data. We can justify this by saying that 
the worker really has no business having any idea what time it really 
is, and the runner can just run the clock at whatever speed it wants.
Another purpose, brought up on the Throttle thread, is to wait or 
backoff. In this case it would be desired for the timer to actually 
cause batch processing to pause and wait. This kind of behavior has 
not been explored much. Notably the runner can absolutely process all 
elements first, then start to fire any enqueued processing time 
timers. In the same way that state in batch can just be in memory, 
this *could* just be a call to sleep(). It all seems a bit sketchy so 
I'd love clearer opinions.
These two are both operational effects - as you would expect for 
processing time timers - and they seem to be in conflict. Maybe they 
just need different features?
I'd love to hear some more uses of processing time timers from the 
community.
Kenn

Reply via email to