> Well yes it was (though as mentioned before, the fact that none of these designs were even written into the spec is a problem), though in some ways not a great one. The only global synchronization method we had was the watermark/end of window, so if the source PCollection was triggered by something else we lost that.This creates some unfortunate situations (in particular I would not recommend using distributed Map-valued side inputs with an early trigger - the behavior is probably not what one expects). Part of the problem is that triggers themselves are non determistic. Something like retractions would make this better but not completely. Something better here would be great, but I'm still not sure what it would be or if any of our runners could implement it.

Yes, the problem is due to the fact that triggers fire at non-deterministic event times, because most of Beam's current triggers are either processing time triggers or data-driven triggers. We could obtain the same behavior as for the end-of-window trigger with event-time triggers (the EOW trigger and GC trigger are AFAIK the only event time triggers we currently have). These might be useful on its own, but would also require some what more complicated logic in GBK (splitting window into panes, holding state for each pane independently, merging state for accumulating triggers, ...).

  Jan



On 3/28/23 17:26, Reuven Lax via dev wrote:


On Tue, Mar 28, 2023 at 12:39 AM Jan Lukavský <je...@seznam.cz> wrote:


    On 3/27/23 19:44, Reuven Lax via dev wrote:


    On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský <je...@seznam.cz> wrote:

        Hi,

        I'd like to clarify my understanding. Side inputs generally
        perform a left (outer) join, LHS side is the main input, RHS
        is the side input.


    Not completely - it's more of what I would call a nested-loop
    join. I.e. if the side input changes _nothing happens_ until a
    new element arrives on the LHS. This isn't quite the same as a
    left-outer join.
    +1. This makes sense, my description was a slight simplification.

        Doing streaming left join requires watermark synchronization,
        thus elements from the main input are buffered until
        main_input_timestamp > side_input_watermark. When side input
        watermark reaches max watermark, main inputs do not have to
        be buffered because the side input will not change anymore.
        This works well for cases when the side input is bounded or
        in the case of "slowly changing" patterns (ideally with
        perfect watermarks, so no late data present).


    This is true for non-triggered side inputs. Triggered side inputs
    have always been different - the main-input elements are buffered
    until the first triggered value of the side input is available.
    I walked again through the code in
    SimplePushBackSideInputDoFnRunner and looks like this is correct,
    the runner actually does not wait for watermark, but for "ready
    windows", which implies what you say. With suitable trigger
    (AfterWatermark.pastEndOfWindow() this coincides with the
    watermark of end of the window.

        Allowing arbitrary changes in the side input (with arbitrary
        triggers) might introduce additional questions - how to
        handle late data in the side input? Full implementation would
        require retractions. Dropping late data does not feel like a
        solution, because then the pipeline would not converge to the
        "correct" solution, as the side input might hold incorrect
        value forever. Applying late data from the processing time
        the DoFn receives them could make the downstream processing
        unstable, restarting the pipeline on errors might change what
        is "on time" and what is late thus generate inconsistent
        different results.

    BTW, triggered side inputs have always been available. The
    problem Kenn is addressing is that nobody has ever written down
    the spec! There was a spec in mind when they were implemented,
    but the fact that this was not written has always been
    problematic (and especially so when creating the portable runner).

    Triggered side inputs have always had some
    non-determinstic behavior, not just for late data. Side inputs
    are cached locally on the reader, so different reading workers
    might have different views on what the latest trigger was.
    Makes sense, is this a design decision? I can imagine that waiting
    for side input watermark unconditionally adds latency, on the
    other hand an "unexpected" non-deterministic behavior can confuse
    users. This type of non-determinism after pipeline failure and
    recovery is even the most hard to debug. If we would document (and
    possibly slightly reimplement) the triggered side-input spec,
    could we add (optional) way to make the processing deterministic
    via watermark sync?


Well yes it was (though as mentioned before, the fact that none of these designs were even written into the spec is a problem), though in some ways not a great one. The only global synchronization method we had was the watermark/end of window, so if the source PCollection was triggered by something else we lost that.This creates some unfortunate situations (in particular I would not recommend using distributed Map-valued side inputs with an early trigger - the behavior is probably not what one expects). Part of the problem is that triggers themselves are non determistic. Something like retractions would make this better but not completely. Something better here would be great, but I'm still not sure what it would be or if any of our runners could implement it.

IMO the least-confusing use of triggered side inputs is as a singleton broadcast (though in that case, we might be better off just introducing a broadcast variable concept into Beam, which would save users writing all of the boilerplate code around side inputs).


        It seems safe to process multiple triggers as long as the
        trigger does not produce late data, though (i.e. early
        emitting). Processing possibly late data might requires to
        buffer main input up while main_input_timestamp >
        side_input_watermark - allowed_lateness.

        Is my line of thinking correct?

         Jan

        On 3/23/23 20:19, Kenneth Knowles wrote:
        Hi all,

        I had a great chat with +Reza Rokni
        <mailto:rezaro...@google.com> and +Reuven Lax
        <mailto:re...@google.com> yesterday about some
        inconsistencies in side input behavior, both before and
        after portability was introduced.

        I wrote up my thoughts about how we should specify the
        semantics and implement them:

        https://s.apache.org/beam-triggered-side-inputs

        I think I found some issues that I think may require changes
        in the portability protocols to get consistent behavior.

        Please take a look and find my errors and oversights!

        Kenn

Reply via email to