Really helpful discussion. Sounds like we pretty much agree that having a
clearer spec will be good?

I'm augmenting ViewTest to have enough tests to exercise the proposed spec
a bit more.

I'm really largely focused on singleton, which I pretty much assume to be
the output of a combiner. So in batch of course it is just one value (per
window). In streaming with multiple trigger firing the multiple firings are
semantically revisions to a single value, so it should "just work" in some
way.

The intuition of today's behavior is "eventually consistent" join, where
some elements see the old value and some see the new value, but the user
cannot control when the switch happens. The problem is that I think we have
runners that either (1) crash on multiple trigger firings or (2) allow
multiple elements that are _not_ from a multiple firing, allowing silent
data corruption.

Kenn

On Wed, Mar 29, 2023 at 6:16 AM Jan Lukavský <je...@seznam.cz> wrote:

> > Well yes it was (though as mentioned before, the fact that none of these
> designs were even written into the spec is a problem), though in some ways
> not a great one. The only global synchronization method we had was the
> watermark/end of window, so if the source PCollection was triggered by
> something else we lost that.This creates some unfortunate situations (in
> particular I would not recommend using distributed Map-valued side inputs
> with an early trigger - the behavior is probably not what one expects).
> Part of the problem is that triggers themselves are non determistic.
> Something like retractions would make this better but not completely.
> Something better here would be great, but I'm still not sure what it would
> be or if any of our runners could implement it.
>
> Yes, the problem is due to the fact that triggers fire at
> non-deterministic event times, because most of Beam's current triggers are
> either processing time triggers or data-driven triggers. We could obtain
> the same behavior as for the end-of-window trigger with event-time triggers
> (the EOW trigger and GC trigger are AFAIK the only event time triggers we
> currently have). These might be useful on its own, but would also require
> some what more complicated logic in GBK (splitting window into panes,
> holding state for each pane independently, merging state for accumulating
> triggers, ...).
>
>   Jan
>
>
> On 3/28/23 17:26, Reuven Lax via dev wrote:
>
>
>
> On Tue, Mar 28, 2023 at 12:39 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>>
>> On 3/27/23 19:44, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> I'd like to clarify my understanding. Side inputs generally perform a
>>> left (outer) join, LHS side is the main input, RHS is the side input.
>>>
>>
>> Not completely - it's more of what I would call a nested-loop join. I.e.
>> if the side input changes _nothing happens_ until a new element arrives on
>> the LHS. This isn't quite the same as a left-outer join.
>>
>> +1. This makes sense, my description was a slight simplification.
>>
>>
>> Doing streaming left join requires watermark synchronization, thus
>>> elements from the main input are buffered until main_input_timestamp >
>>> side_input_watermark. When side input watermark reaches max watermark, main
>>> inputs do not have to be buffered because the side input will not change
>>> anymore. This works well for cases when the side input is bounded or in the
>>> case of "slowly changing" patterns (ideally with perfect watermarks, so no
>>> late data present).
>>>
>>
>> This is true for non-triggered side inputs. Triggered side inputs have
>> always been different - the main-input elements are buffered until the
>> first triggered value of the side input is available.
>>
>> I walked again through the code in SimplePushBackSideInputDoFnRunner and
>> looks like this is correct, the runner actually does not wait for
>> watermark, but for "ready windows", which implies what you say. With
>> suitable trigger (AfterWatermark.pastEndOfWindow() this coincides with the
>> watermark of end of the window.
>>
>>
>>
>>> Allowing arbitrary changes in the side input (with arbitrary triggers)
>>> might introduce additional questions - how to handle late data in the side
>>> input? Full implementation would require retractions. Dropping late data
>>> does not feel like a solution, because then the pipeline would not converge
>>> to the "correct" solution, as the side input might hold incorrect value
>>> forever. Applying late data from the processing time the DoFn receives them
>>> could make the downstream processing unstable, restarting the pipeline on
>>> errors might change what is "on time" and what is late thus generate
>>> inconsistent different results.
>>>
>> BTW, triggered side inputs have always been available. The problem Kenn
>> is addressing is that nobody has ever written down the spec! There was a
>> spec in mind when they were implemented, but the fact that this was not
>> written has always been problematic (and especially so when creating the
>> portable runner).
>>
>> Triggered side inputs have always had some non-determinstic behavior, not
>> just for late data. Side inputs are cached locally on the reader, so
>> different reading workers might have different views on what the latest
>> trigger was.
>>
>> Makes sense, is this a design decision? I can imagine that waiting for
>> side input watermark unconditionally adds latency, on the other hand an
>> "unexpected" non-deterministic behavior can confuse users. This type of
>> non-determinism after pipeline failure and recovery is even the most hard
>> to debug. If we would document (and possibly slightly reimplement) the
>> triggered side-input spec, could we add (optional) way to make the
>> processing deterministic via watermark sync?
>>
>
> Well yes it was (though as mentioned before, the fact that none of these
> designs were even written into the spec is a problem), though in some ways
> not a great one. The only global synchronization method we had was the
> watermark/end of window, so if the source PCollection was triggered by
> something else we lost that.This creates some unfortunate situations (in
> particular I would not recommend using distributed Map-valued side inputs
> with an early trigger - the behavior is probably not what one expects).
> Part of the problem is that triggers themselves are non determistic.
> Something like retractions would make this better but not completely.
> Something better here would be great, but I'm still not sure what it would
> be or if any of our runners could implement it.
>
> IMO the least-confusing use of triggered side inputs is as a singleton
> broadcast (though in that case, we might be better off just introducing a
> broadcast variable concept into Beam, which would save users writing all of
> the boilerplate code around side inputs).
>
>>
>>
>>> It seems safe to process multiple triggers as long as the trigger does
>>> not produce late data, though (i.e. early emitting). Processing possibly
>>> late data might requires to buffer main input up while main_input_timestamp
>>> > side_input_watermark - allowed_lateness.
>>>
>>> Is my line of thinking correct?
>>>
>>>  Jan
>>> On 3/23/23 20:19, Kenneth Knowles wrote:
>>>
>>> Hi all,
>>>
>>> I had a great chat with +Reza Rokni <rezaro...@google.com> and +Reuven
>>> Lax <re...@google.com> yesterday about some inconsistencies in side
>>> input behavior, both before and after portability was introduced.
>>>
>>> I wrote up my thoughts about how we should specify the semantics and
>>> implement them:
>>>
>>> https://s.apache.org/beam-triggered-side-inputs
>>>
>>> I think I found some issues that I think may require changes in the
>>> portability protocols to get consistent behavior.
>>>
>>> Please take a look and find my errors and oversights!
>>>
>>> Kenn
>>>
>>>

Reply via email to