On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I'd like to clarify my understanding. Side inputs generally perform a left
> (outer) join, LHS side is the main input, RHS is the side input.
>

Not completely - it's more of what I would call a nested-loop join. I.e. if
the side input changes _nothing happens_ until a new element arrives on the
LHS. This isn't quite the same as a left-outer join.

Doing streaming left join requires watermark synchronization, thus elements
> from the main input are buffered until main_input_timestamp >
> side_input_watermark. When side input watermark reaches max watermark, main
> inputs do not have to be buffered because the side input will not change
> anymore. This works well for cases when the side input is bounded or in the
> case of "slowly changing" patterns (ideally with perfect watermarks, so no
> late data present).
>

This is true for non-triggered side inputs. Triggered side inputs have
always been different - the main-input elements are buffered until the
first triggered value of the side input is available.


> Allowing arbitrary changes in the side input (with arbitrary triggers)
> might introduce additional questions - how to handle late data in the side
> input? Full implementation would require retractions. Dropping late data
> does not feel like a solution, because then the pipeline would not converge
> to the "correct" solution, as the side input might hold incorrect value
> forever. Applying late data from the processing time the DoFn receives them
> could make the downstream processing unstable, restarting the pipeline on
> errors might change what is "on time" and what is late thus generate
> inconsistent different results.
>
BTW, triggered side inputs have always been available. The problem Kenn is
addressing is that nobody has ever written down the spec! There was a spec
in mind when they were implemented, but the fact that this was not written
has always been problematic (and especially so when creating the portable
runner).

Triggered side inputs have always had some non-determinstic behavior, not
just for late data. Side inputs are cached locally on the reader, so
different reading workers might have different views on what the latest
trigger was.


> It seems safe to process multiple triggers as long as the trigger does not
> produce late data, though (i.e. early emitting). Processing possibly late
> data might requires to buffer main input up while main_input_timestamp >
> side_input_watermark - allowed_lateness.
>
> Is my line of thinking correct?
>
>  Jan
> On 3/23/23 20:19, Kenneth Knowles wrote:
>
> Hi all,
>
> I had a great chat with +Reza Rokni <rezaro...@google.com> and +Reuven Lax
> <re...@google.com> yesterday about some inconsistencies in side input
> behavior, both before and after portability was introduced.
>
> I wrote up my thoughts about how we should specify the semantics and
> implement them:
>
> https://s.apache.org/beam-triggered-side-inputs
>
> I think I found some issues that I think may require changes in the
> portability protocols to get consistent behavior.
>
> Please take a look and find my errors and oversights!
>
> Kenn
>
>

Reply via email to