Yeah, we already have `ResourceHint.get_merged_value(cls, outer_value, inner_value)` for reconciling resources within a composite, in the future we could possibly just have another similar method and have the environment merging logic hook into that.
On Fri, Dec 15, 2023 at 3:53 PM Robert Bradshaw via dev <dev@beam.apache.org> wrote: > There is definitely a body of future work in intelligently merging > compatible-but-not-equal environments. (Dataflow does this for example.) > Defining/detecting compatibility is not always easy, but sometimes is, and > we should at least cover those cases and grow them over time. > > On Fri, Dec 15, 2023 at 5:57 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Yeah I can confirm for the python runners (based on my reading of the >> translations.py [1]) that only identical environments are merged together. >> >> The funny thing is that we _originally_ implemented this hint as an >> annotation but then changed it to hint because it semantically felt more >> correct. I think we might go back to that since the environment merging >> logic isn't too flexible / easy to customize. Our type of hint is a bit >> unlike other hints anyways. Unlike resources like MinRam, these resources >> are additive (e.g. you can merge an environment that requires license A and >> an environment that requires license B into an environment that requires >> both A and B) >> >> [1] >> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4 >> >> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote: >> >>> That would do it. We got so tunnel visioned on side inputs we missed >>> that! >>> >>> IIRC the python local runner and Prism both only fuse transforms in >>> identical environments together. So any environmental diffs will prevent >>> fusion. >>> >>> Runners as a rule are usually free to ignore/manage hints as they like. >>> Transform annotations might be an alternative, but how those are managed >>> would be more SDK specific. >>> >>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> I figured out my issue. I thought side inputs were breaking up my >>>> pipeline but after experimenting with my transforms I now realize what was >>>> actually breaking it up was different transform environments that weren't >>>> considered compatible. >>>> >>>> We have a custom resource hint (for specifying whether a transform >>>> needs access to some software license) that we use with our transforms and >>>> that's what was preventing the fusion I was expecting. I'm I'm looking into >>>> how to make these hints mergeable now. >>>> >>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> >>>> wrote: >>>> >>>>> Building on what Robert Bradshaw has said, basically, if these fusion >>>>> breaks don't exist, the pipeline can live lock, because the side input is >>>>> unable to finish computing for a given input element's window. >>>>> >>>>> I have recently added fusion to the Go Prism runner based on the >>>>> python side input semantics, and i was surprised that there are basically >>>>> two rules for fusion. The side input one, and for handling Stateful >>>>> processing. >>>>> >>>>> >>>>> This code here is the greedy fusion algorithm that Python uses, but a >>>>> less set based, so it might be easier to follow: >>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513 >>>>> >>>>> From the linked code comment: >>>>> >>>>> Side Inputs: A transform S consuming a PCollection as a side input >>>>> can't >>>>> be fused with the transform P that produces that PCollection. Further, >>>>> no transform S+ descended from S, can be fused with transform P. >>>>> >>>>> Ideally I'll add visual representations of the graphs in the test >>>>> suite here, that validates the side input dependency logic: >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398 >>>>> >>>>> (Note, that test doesn't validate expected fusion results, Prism is a >>>>> work in progress). >>>>> >>>>> >>>>> As for the Stateful rule, this is largely an implementation >>>>> convenience for runners to ensure correct execution. >>>>> If your pipeline also uses Stateful transforms, or SplittableDoFns, >>>>> those are usually relegated to the root of a fused stage, and avoids >>>>> fusions with each other. That can also cause additional stages. >>>>> >>>>> If Beam adopted a rigorous notion of Key Preserving for transforms, >>>>> multiple stateful transforms could be fused in the same stage. But that's >>>>> a >>>>> very different discussion. >>>>> >>>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Thanks for the explanation! >>>>>> >>>>>> That matches with my intuition - are there any other rules with side >>>>>> inputs? >>>>>> >>>>>> I might be misunderstanding the actual cause of the fusion breaks in >>>>>> our pipeline, but we essentially have one part of the graph that produces >>>>>> many small collections that are used as side inputs in the remaining part >>>>>> of the graph. In other words, the "main graph" is mostly linear but uses >>>>>> side inputs from the earlier part of the graph. >>>>>> >>>>>> Since the main graph is mostly linear, I expected few stages, but >>>>>> what I actually see are a lot of breaks around the side input requiring >>>>>> transforms. >>>>>> >>>>>> >>>>>> Tangentially, are there any general tips for understanding why a >>>>>> graph might be fused the way it was? >>>>>> >>>>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev < >>>>>> dev@beam.apache.org> wrote: >>>>>> >>>>>>> That is correct. Side inputs give a view of the "whole" PCollection >>>>>>> and hence introduce a fusion-producing barrier. For example, suppose one >>>>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are >>>>>>> consumed (as the main and side input respectively) of DoFnB. >>>>>>> >>>>>>> -------- mainPColl ----- DoFnB >>>>>>> / ^ >>>>>>> inPColl -- DoFnA | >>>>>>> \ | >>>>>>> -------- sidePColl ------- / >>>>>>> >>>>>>> >>>>>>> Now DoFnB may iterate over the entity of sidePColl for every element >>>>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which >>>>>>> would require DoFnB to consume the elements as they are produced from >>>>>>> DoFnA, but we need DoFnA to run to completion before we know the >>>>>>> contents >>>>>>> of sidePColl. >>>>>>> >>>>>>> Similar constraints apply in larger graphs (e.g. there may be many >>>>>>> intermediate DoFns and PCollections), but they principally boil down to >>>>>>> shapes that look like this. >>>>>>> >>>>>>> Though this does not introduce a global barrier in streaming, there >>>>>>> is still the analogous per window/watermark barrier that prevents fusion >>>>>>> for the same reasons. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey all, >>>>>>>> >>>>>>>> We have a pretty big pipeline and while I was inspecting the >>>>>>>> stages, I noticed there is less fusion than I expected. I suspect it >>>>>>>> has to >>>>>>>> do with the heavy use of side inputs in our workflow. In the python >>>>>>>> sdk, I >>>>>>>> see that side inputs are considered when determining whether two >>>>>>>> stages are >>>>>>>> fusible. I have a hard time getting a clear understanding of the logic >>>>>>>> though. Could someone clarify / summarize the rules around this? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> Joey >>>>>>>> >>>>>>>