> Makes sense, is this a design decision? I can imagine that waiting
for side input watermark unconditionally adds latency, on the other hand
an "unexpected" non-deterministic behavior can confuse users. This type
of non-determinism after pipeline failure and recovery is even the most
hard to debug. If we would document (and possibly slightly reimplement)
the triggered side-input spec, could we add (optional) way to make the
processing deterministic via watermark sync?
Ah, I see the problem, triggered elements cannot be "synced" via
watermark, because they carry timestamp defined by TimestampCombiner.
Makes sense now, thanks. +1
Jan
On 3/28/23 09:39, Jan Lukavský wrote:
On 3/27/23 19:44, Reuven Lax via dev wrote:
On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I'd like to clarify my understanding. Side inputs generally
perform a left (outer) join, LHS side is the main input, RHS is
the side input.
Not completely - it's more of what I would call a nested-loop join.
I.e. if the side input changes _nothing happens_ until a new element
arrives on the LHS. This isn't quite the same as a left-outer join.
+1. This makes sense, my description was a slight simplification.
Doing streaming left join requires watermark synchronization,
thus elements from the main input are buffered until
main_input_timestamp > side_input_watermark. When side input
watermark reaches max watermark, main inputs do not have to be
buffered because the side input will not change anymore. This
works well for cases when the side input is bounded or in the
case of "slowly changing" patterns (ideally with perfect
watermarks, so no late data present).
This is true for non-triggered side inputs. Triggered side inputs
have always been different - the main-input elements are buffered
until the first triggered value of the side input is available.
I walked again through the code in SimplePushBackSideInputDoFnRunner
and looks like this is correct, the runner actually does not wait for
watermark, but for "ready windows", which implies what you say. With
suitable trigger (AfterWatermark.pastEndOfWindow() this coincides with
the watermark of end of the window.
Allowing arbitrary changes in the side input (with arbitrary
triggers) might introduce additional questions - how to handle
late data in the side input? Full implementation would require
retractions. Dropping late data does not feel like a solution,
because then the pipeline would not converge to the "correct"
solution, as the side input might hold incorrect value forever.
Applying late data from the processing time the DoFn receives
them could make the downstream processing unstable, restarting
the pipeline on errors might change what is "on time" and what is
late thus generate inconsistent different results.
BTW, triggered side inputs have always been available. The problem
Kenn is addressing is that nobody has ever written down the spec!
There was a spec in mind when they were implemented, but the fact
that this was not written has always been problematic (and especially
so when creating the portable runner).
Triggered side inputs have always had some non-determinstic behavior,
not just for late data. Side inputs are cached locally on the reader,
so different reading workers might have different views on what the
latest trigger was.
Makes sense, is this a design decision? I can imagine that waiting for
side input watermark unconditionally adds latency, on the other hand
an "unexpected" non-deterministic behavior can confuse users. This
type of non-determinism after pipeline failure and recovery is even
the most hard to debug. If we would document (and possibly slightly
reimplement) the triggered side-input spec, could we add (optional)
way to make the processing deterministic via watermark sync?
It seems safe to process multiple triggers as long as the trigger
does not produce late data, though (i.e. early emitting).
Processing possibly late data might requires to buffer main input
up while main_input_timestamp > side_input_watermark -
allowed_lateness.
Is my line of thinking correct?
Jan
On 3/23/23 20:19, Kenneth Knowles wrote:
Hi all,
I had a great chat with +Reza Rokni
<mailto:rezaro...@google.com> and +Reuven Lax
<mailto:re...@google.com> yesterday about some inconsistencies
in side input behavior, both before and after portability was
introduced.
I wrote up my thoughts about how we should specify the semantics
and implement them:
https://s.apache.org/beam-triggered-side-inputs
I think I found some issues that I think may require changes in
the portability protocols to get consistent behavior.
Please take a look and find my errors and oversights!
Kenn