The thing I dislike about this all is that a main value that Beam (& similar) bring to users is removing the concerns of classical concurrent programming. But Luke's example is convincing that we might need to have a discussion around a causality-based consistency model.
On Fri, Apr 12, 2019 at 9:58 AM Lukasz Cwik <lc...@google.com> wrote: > Yes, if we had such a pipeline: > ParDo(A) --> PCollectionView S > > ... > | > ParDo(C) <-(side input)- PCollectionView S > | > ... > | > ParDo(D) <-(side input)- PCollectionView S > | > ... > > We could reason that ParDo(D) should see at least the same or newer > contents of PCollectionView S then when ParDo(C) saw it. > I think it is easy to talk about happens-before using only data provenance here. If ParDo(D) sees an element that is caused by some element `y` in S, then ParDo(D) is effectively observing that `y` is in S indirectly, so it must also be observed directly. The case of meta-keys is harder, because there is not automatically a data provenance argument. You could perhaps make a pipeline that forces it by computing the meta-key downstream from the other outputs. Therefore observing the meta key implies indirectly observing the outputs. But you cannot make the data dependency go both directions, I think, and also this is not the most efficient way to implement. Kenn > > On Thu, Apr 11, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: > >> BTW another issue is when a single triggered PCollectionView is read by >> two different ParDos - each one might have a different view of the trigger. >> This is noticeable if the output of those two ParDos is then joined >> together. >> >> Reuven >> >> On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> The consistency problem occurs even in a single output PCollection that >>> is read as a side input, because two output elements can be re-bundled and >>> materialized in separate updates to the side input. >>> >>> Kenn >>> >>> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ruo...@google.com> wrote: >>> >>>> With little to none experience on Trigger, I am trying to understand >>>> the problem statement in this discussion. >>>> >>>> If a user is aware of the potential non-deterministic behavior, isn't >>>> it almost trivial to refactor his/her user code, by putting >>>> PCollectionViews S and T into one single PCollectionView S', to get around >>>> the issue? I cannot think of a reason (wrong?) why a user *have* to put >>>> data into two separate PCollectionViews in a single ParDo(A). >>>> >>>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Even though what Kenn points out is a major reason for me bringing up >>>>> this topic, I didn't want to limit this discussion to how side inputs >>>>> could >>>>> work but in general what users want from their side inputs when dealing >>>>> with multiple firings. >>>>> >>>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Luke & I talked in person a bit. I want to give context for what is >>>>>> at stake here, in terms of side inputs in portability. A decent starting >>>>>> place is https://s.apache.org/beam-side-inputs-1-pager >>>>>> >>>>>> In that general design, the runner offers the SDK just one (or a few) >>>>>> materialization strategies, and the SDK builds idiomatic structures on >>>>>> top >>>>>> of it. Concretely, the Fn API today offers a multimap structure, and the >>>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for >>>>>> the >>>>>> runner to materialize. As a naive example, a simple iterable structure >>>>>> could just map all elements to one dummy key in the multimap. But if you >>>>>> wanted a list plus its length, then you might map all elements to an >>>>>> element key and the length to a special length meta-key. >>>>>> >>>>>> So there is a problem: if the SDK is outputting a new >>>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to >>>>>> materialize then consumers of the side input need to see both updates to >>>>>> the materialization or neither. In general, these outputs might span many >>>>>> keys. >>>>>> >>>>>> It seems like there are a few ways to resolve this tension: >>>>>> >>>>>> - Establish a consistency model so these updates will be observed >>>>>> together. Seems hard and whatever we come up with will limit runners, >>>>>> limit >>>>>> efficiency, and potentially leak into users having to reason about >>>>>> concurrency >>>>>> >>>>>> - Instead of building the variety of side input views on one >>>>>> primitive multimap materialization, force runners to provide many >>>>>> primitive >>>>>> materializations with consistency under the hood. Not hard to get >>>>>> started, >>>>>> but adds an unfortunate new dimension for runners to vary in >>>>>> functionality >>>>>> and performance, versus letting them optimize just one or a few >>>>>> materializations >>>>>> >>>>>> - Have no consistency and just not support side input methods that >>>>>> would require consistent metadata. I'm curious what features this will >>>>>> hurt. >>>>>> >>>>>> - Have no consistency but require the SDK to build some sort of >>>>>> large value since single-element consistency is built in to the model >>>>>> always. Today many runners do concatenate all elements into one value, >>>>>> though that does not perform well. Making this effective probably >>>>>> requires >>>>>> new model features. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> One thing to keep in mind: triggers that fire multiple times per >>>>>>> window already tend to be non deterministic. These are element-count or >>>>>>> processing-time triggers, both of which are fairly non deterministic in >>>>>>> firing. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Today, we define that a side input becomes available to be consumed >>>>>>>> once at least one firing occurs or when the runner detects that no such >>>>>>>> output could be produced (e.g. watermark is beyond the end of the >>>>>>>> window >>>>>>>> when using the default trigger). For triggers that fire at most once, >>>>>>>> consumers are guaranteed to have a consistent view of the contents of >>>>>>>> the >>>>>>>> side input. But what happens when the trigger fire multiple times? >>>>>>>> >>>>>>>> Lets say we have a pipeline containing: >>>>>>>> ParDo(A) --> PCollectionView S >>>>>>>> \-> PCollectionView T >>>>>>>> >>>>>>>> ... >>>>>>>> | >>>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T >>>>>>>> | >>>>>>>> ... >>>>>>>> >>>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to >>>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if >>>>>>>> it >>>>>>>> can also see Y (and vice versa)? >>>>>>>> >>>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to >>>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be >>>>>>>> guaranteed >>>>>>>> to see X only if it can also see Y? >>>>>>>> >>>>>>> >>>> >>>> -- >>>> ================ >>>> Ruoyun Huang >>>> >>>>