The thing I dislike about this all is that a main value that Beam (&
similar) bring to users is removing the concerns of classical concurrent
programming. But Luke's example is convincing that we might need to have a
discussion around a causality-based consistency model.

On Fri, Apr 12, 2019 at 9:58 AM Lukasz Cwik <lc...@google.com> wrote:

> Yes, if we had such a pipeline:
> ParDo(A) --> PCollectionView S
>
>   ...
>    |
> ParDo(C) <-(side input)- PCollectionView S
>    |
>   ...
>    |
> ParDo(D) <-(side input)- PCollectionView S
>    |
>   ...
>
> We could reason that ParDo(D) should see at least the same or newer
> contents of PCollectionView S then when ParDo(C) saw it.
>

I think it is easy to talk about happens-before using only data provenance
here. If ParDo(D) sees an element that is caused by some element `y` in S,
then ParDo(D) is effectively observing that `y` is in S indirectly, so it
must also be observed directly.

The case of meta-keys is harder, because there is not automatically a data
provenance argument. You could perhaps make a pipeline that forces it by
computing the meta-key downstream from the other outputs. Therefore
observing the meta key implies indirectly observing the outputs. But you
cannot make the data dependency go both directions, I think, and also this
is not the most efficient way to implement.

Kenn



>
> On Thu, Apr 11, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>
>> BTW another issue is when a single triggered PCollectionView is read by
>> two different ParDos - each one might have a different view of the trigger.
>> This is noticeable if the output of those two ParDos is then joined
>> together.
>>
>> Reuven
>>
>> On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> The consistency problem occurs even in a single output PCollection that
>>> is read as a side input, because two output elements can be re-bundled and
>>> materialized in separate updates to the side input.
>>>
>>> Kenn
>>>
>>> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ruo...@google.com> wrote:
>>>
>>>> With little to none experience on Trigger, I am trying to understand
>>>> the problem statement in this discussion.
>>>>
>>>> If a user is aware of the potential non-deterministic behavior, isn't
>>>> it almost trivial to refactor his/her user code, by putting
>>>> PCollectionViews S and T into one single PCollectionView S', to get around
>>>> the issue?     I cannot think of a reason (wrong?) why a user *have* to put
>>>> data into two separate PCollectionViews in a single ParDo(A).
>>>>
>>>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Even though what Kenn points out is a major reason for me bringing up
>>>>> this topic, I didn't want to limit this discussion to how side inputs 
>>>>> could
>>>>> work but in general what users want from their side inputs when dealing
>>>>> with multiple firings.
>>>>>
>>>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Luke & I talked in person a bit. I want to give context for what is
>>>>>> at stake here, in terms of side inputs in portability. A decent starting
>>>>>> place is https://s.apache.org/beam-side-inputs-1-pager
>>>>>>
>>>>>> In that general design, the runner offers the SDK just one (or a few)
>>>>>> materialization strategies, and the SDK builds idiomatic structures on 
>>>>>> top
>>>>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for 
>>>>>> the
>>>>>> runner to materialize. As a naive example, a simple iterable structure
>>>>>> could just map all elements to one dummy key in the multimap. But if you
>>>>>> wanted a list plus its length, then you might map all elements to an
>>>>>> element key and the length to a special length meta-key.
>>>>>>
>>>>>> So there is a problem: if the SDK is outputting a new
>>>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to
>>>>>> materialize then consumers of the side input need to see both updates to
>>>>>> the materialization or neither. In general, these outputs might span many
>>>>>> keys.
>>>>>>
>>>>>> It seems like there are a few ways to resolve this tension:
>>>>>>
>>>>>>  - Establish a consistency model so these updates will be observed
>>>>>> together. Seems hard and whatever we come up with will limit runners, 
>>>>>> limit
>>>>>> efficiency, and potentially leak into users having to reason about
>>>>>> concurrency
>>>>>>
>>>>>>  - Instead of building the variety of side input views on one
>>>>>> primitive multimap materialization, force runners to provide many 
>>>>>> primitive
>>>>>> materializations with consistency under the hood. Not hard to get 
>>>>>> started,
>>>>>> but adds an unfortunate new dimension for runners to vary in 
>>>>>> functionality
>>>>>> and performance, versus letting them optimize just one or a few
>>>>>> materializations
>>>>>>
>>>>>>  - Have no consistency and just not support side input methods that
>>>>>> would require consistent metadata. I'm curious what features this will 
>>>>>> hurt.
>>>>>>
>>>>>>  - Have no consistency but require the SDK to build some sort of
>>>>>> large value since single-element consistency is built in to the model
>>>>>> always. Today many runners do concatenate all elements into one value,
>>>>>> though that does not perform well. Making this effective probably 
>>>>>> requires
>>>>>> new model features.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> One thing to keep in mind: triggers that fire multiple times per
>>>>>>> window already tend to be non deterministic. These are element-count or
>>>>>>> processing-time triggers, both of which are fairly non deterministic in
>>>>>>> firing.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Today, we define that a side input becomes available to be consumed
>>>>>>>> once at least one firing occurs or when the runner detects that no such
>>>>>>>> output could be produced (e.g. watermark is beyond the end of the 
>>>>>>>> window
>>>>>>>> when using the default trigger). For triggers that fire at most once,
>>>>>>>> consumers are guaranteed to have a consistent view of the contents of 
>>>>>>>> the
>>>>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>>>>
>>>>>>>> Lets say we have a pipeline containing:
>>>>>>>> ParDo(A) --> PCollectionView S
>>>>>>>>          \-> PCollectionView T
>>>>>>>>
>>>>>>>>   ...
>>>>>>>>    |
>>>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>>>>    |
>>>>>>>>   ...
>>>>>>>>
>>>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if 
>>>>>>>> it
>>>>>>>> can also see Y (and vice versa)?
>>>>>>>>
>>>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be 
>>>>>>>> guaranteed
>>>>>>>> to see X only if it can also see Y?
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> ================
>>>> Ruoyun  Huang
>>>>
>>>>

Reply via email to