There is definitely a body of future work in intelligently merging compatible-but-not-equal environments. (Dataflow does this for example.) Defining/detecting compatibility is not always easy, but sometimes is, and we should at least cover those cases and grow them over time.
On Fri, Dec 15, 2023 at 5:57 AM Joey Tran <joey.t...@schrodinger.com> wrote: > Yeah I can confirm for the python runners (based on my reading of the > translations.py [1]) that only identical environments are merged together. > > The funny thing is that we _originally_ implemented this hint as an > annotation but then changed it to hint because it semantically felt more > correct. I think we might go back to that since the environment merging > logic isn't too flexible / easy to customize. Our type of hint is a bit > unlike other hints anyways. Unlike resources like MinRam, these resources > are additive (e.g. you can merge an environment that requires license A and > an environment that requires license B into an environment that requires > both A and B) > > [1] > https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4 > > On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote: > >> That would do it. We got so tunnel visioned on side inputs we missed that! >> >> IIRC the python local runner and Prism both only fuse transforms in >> identical environments together. So any environmental diffs will prevent >> fusion. >> >> Runners as a rule are usually free to ignore/manage hints as they like. >> Transform annotations might be an alternative, but how those are managed >> would be more SDK specific. >> >> On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> I figured out my issue. I thought side inputs were breaking up my >>> pipeline but after experimenting with my transforms I now realize what was >>> actually breaking it up was different transform environments that weren't >>> considered compatible. >>> >>> We have a custom resource hint (for specifying whether a transform needs >>> access to some software license) that we use with our transforms and that's >>> what was preventing the fusion I was expecting. I'm I'm looking into how to >>> make these hints mergeable now. >>> >>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote: >>> >>>> Building on what Robert Bradshaw has said, basically, if these fusion >>>> breaks don't exist, the pipeline can live lock, because the side input is >>>> unable to finish computing for a given input element's window. >>>> >>>> I have recently added fusion to the Go Prism runner based on the python >>>> side input semantics, and i was surprised that there are basically two >>>> rules for fusion. The side input one, and for handling Stateful processing. >>>> >>>> >>>> This code here is the greedy fusion algorithm that Python uses, but a >>>> less set based, so it might be easier to follow: >>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513 >>>> >>>> From the linked code comment: >>>> >>>> Side Inputs: A transform S consuming a PCollection as a side input can't >>>> be fused with the transform P that produces that PCollection. Further, >>>> no transform S+ descended from S, can be fused with transform P. >>>> >>>> Ideally I'll add visual representations of the graphs in the test suite >>>> here, that validates the side input dependency logic: >>>> >>>> >>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398 >>>> >>>> (Note, that test doesn't validate expected fusion results, Prism is a >>>> work in progress). >>>> >>>> >>>> As for the Stateful rule, this is largely an implementation convenience >>>> for runners to ensure correct execution. >>>> If your pipeline also uses Stateful transforms, or SplittableDoFns, >>>> those are usually relegated to the root of a fused stage, and avoids >>>> fusions with each other. That can also cause additional stages. >>>> >>>> If Beam adopted a rigorous notion of Key Preserving for transforms, >>>> multiple stateful transforms could be fused in the same stage. But that's a >>>> very different discussion. >>>> >>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Thanks for the explanation! >>>>> >>>>> That matches with my intuition - are there any other rules with side >>>>> inputs? >>>>> >>>>> I might be misunderstanding the actual cause of the fusion breaks in >>>>> our pipeline, but we essentially have one part of the graph that produces >>>>> many small collections that are used as side inputs in the remaining part >>>>> of the graph. In other words, the "main graph" is mostly linear but uses >>>>> side inputs from the earlier part of the graph. >>>>> >>>>> Since the main graph is mostly linear, I expected few stages, but >>>>> what I actually see are a lot of breaks around the side input requiring >>>>> transforms. >>>>> >>>>> >>>>> Tangentially, are there any general tips for understanding why a graph >>>>> might be fused the way it was? >>>>> >>>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> That is correct. Side inputs give a view of the "whole" PCollection >>>>>> and hence introduce a fusion-producing barrier. For example, suppose one >>>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are >>>>>> consumed (as the main and side input respectively) of DoFnB. >>>>>> >>>>>> -------- mainPColl ----- DoFnB >>>>>> / ^ >>>>>> inPColl -- DoFnA | >>>>>> \ | >>>>>> -------- sidePColl ------- / >>>>>> >>>>>> >>>>>> Now DoFnB may iterate over the entity of sidePColl for every element >>>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which >>>>>> would require DoFnB to consume the elements as they are produced from >>>>>> DoFnA, but we need DoFnA to run to completion before we know the contents >>>>>> of sidePColl. >>>>>> >>>>>> Similar constraints apply in larger graphs (e.g. there may be many >>>>>> intermediate DoFns and PCollections), but they principally boil down to >>>>>> shapes that look like this. >>>>>> >>>>>> Though this does not introduce a global barrier in streaming, there >>>>>> is still the analogous per window/watermark barrier that prevents fusion >>>>>> for the same reasons. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Hey all, >>>>>>> >>>>>>> We have a pretty big pipeline and while I was inspecting the stages, >>>>>>> I noticed there is less fusion than I expected. I suspect it has to do >>>>>>> with >>>>>>> the heavy use of side inputs in our workflow. In the python sdk, I see >>>>>>> that >>>>>>> side inputs are considered when determining whether two stages are >>>>>>> fusible. >>>>>>> I have a hard time getting a clear understanding of the logic though. >>>>>>> Could >>>>>>> someone clarify / summarize the rules around this? >>>>>>> >>>>>>> Thanks! >>>>>>> Joey >>>>>>> >>>>>>