On Tue, Mar 28, 2023 at 12:39 AM Jan Lukavský <je...@seznam.cz> wrote:
On 3/27/23 19:44, Reuven Lax via dev wrote:
On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I'd like to clarify my understanding. Side inputs generally
perform a left (outer) join, LHS side is the main input, RHS
is the side input.
Not completely - it's more of what I would call a nested-loop
join. I.e. if the side input changes _nothing happens_ until a
new element arrives on the LHS. This isn't quite the same as a
left-outer join.
+1. This makes sense, my description was a slight simplification.
Doing streaming left join requires watermark synchronization,
thus elements from the main input are buffered until
main_input_timestamp > side_input_watermark. When side input
watermark reaches max watermark, main inputs do not have to
be buffered because the side input will not change anymore.
This works well for cases when the side input is bounded or
in the case of "slowly changing" patterns (ideally with
perfect watermarks, so no late data present).
This is true for non-triggered side inputs. Triggered side inputs
have always been different - the main-input elements are buffered
until the first triggered value of the side input is available.
I walked again through the code in
SimplePushBackSideInputDoFnRunner and looks like this is correct,
the runner actually does not wait for watermark, but for "ready
windows", which implies what you say. With suitable trigger
(AfterWatermark.pastEndOfWindow() this coincides with the
watermark of end of the window.
Allowing arbitrary changes in the side input (with arbitrary
triggers) might introduce additional questions - how to
handle late data in the side input? Full implementation would
require retractions. Dropping late data does not feel like a
solution, because then the pipeline would not converge to the
"correct" solution, as the side input might hold incorrect
value forever. Applying late data from the processing time
the DoFn receives them could make the downstream processing
unstable, restarting the pipeline on errors might change what
is "on time" and what is late thus generate inconsistent
different results.
BTW, triggered side inputs have always been available. The
problem Kenn is addressing is that nobody has ever written down
the spec! There was a spec in mind when they were implemented,
but the fact that this was not written has always been
problematic (and especially so when creating the portable runner).
Triggered side inputs have always had some
non-determinstic behavior, not just for late data. Side inputs
are cached locally on the reader, so different reading workers
might have different views on what the latest trigger was.
Makes sense, is this a design decision? I can imagine that waiting
for side input watermark unconditionally adds latency, on the
other hand an "unexpected" non-deterministic behavior can confuse
users. This type of non-determinism after pipeline failure and
recovery is even the most hard to debug. If we would document (and
possibly slightly reimplement) the triggered side-input spec,
could we add (optional) way to make the processing deterministic
via watermark sync?
Well yes it was (though as mentioned before, the fact that none of
these designs were even written into the spec is a problem), though in
some ways not a great one. The only global synchronization method we
had was the watermark/end of window, so if the source PCollection was
triggered by something else we lost that.This creates some unfortunate
situations (in particular I would not recommend using distributed
Map-valued side inputs with an early trigger - the behavior is
probably not what one expects). Part of the problem is that triggers
themselves are non determistic. Something like retractions would make
this better but not completely. Something better here would be great,
but I'm still not sure what it would be or if any of our runners could
implement it.
IMO the least-confusing use of triggered side inputs is as a singleton
broadcast (though in that case, we might be better off just
introducing a broadcast variable concept into Beam, which would save
users writing all of the boilerplate code around side inputs).
It seems safe to process multiple triggers as long as the
trigger does not produce late data, though (i.e. early
emitting). Processing possibly late data might requires to
buffer main input up while main_input_timestamp >
side_input_watermark - allowed_lateness.
Is my line of thinking correct?
Jan
On 3/23/23 20:19, Kenneth Knowles wrote:
Hi all,
I had a great chat with +Reza Rokni
<mailto:rezaro...@google.com> and +Reuven Lax
<mailto:re...@google.com> yesterday about some
inconsistencies in side input behavior, both before and
after portability was introduced.
I wrote up my thoughts about how we should specify the
semantics and implement them:
https://s.apache.org/beam-triggered-side-inputs
I think I found some issues that I think may require changes
in the portability protocols to get consistent behavior.
Please take a look and find my errors and oversights!
Kenn