Yeah I can confirm for the python runners (based on my reading of the translations.py [1]) that only identical environments are merged together.
The funny thing is that we _originally_ implemented this hint as an annotation but then changed it to hint because it semantically felt more correct. I think we might go back to that since the environment merging logic isn't too flexible / easy to customize. Our type of hint is a bit unlike other hints anyways. Unlike resources like MinRam, these resources are additive (e.g. you can merge an environment that requires license A and an environment that requires license B into an environment that requires both A and B) [1] https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4 On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote: > That would do it. We got so tunnel visioned on side inputs we missed that! > > IIRC the python local runner and Prism both only fuse transforms in > identical environments together. So any environmental diffs will prevent > fusion. > > Runners as a rule are usually free to ignore/manage hints as they like. > Transform annotations might be an alternative, but how those are managed > would be more SDK specific. > > On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com> wrote: > >> I figured out my issue. I thought side inputs were breaking up my >> pipeline but after experimenting with my transforms I now realize what was >> actually breaking it up was different transform environments that weren't >> considered compatible. >> >> We have a custom resource hint (for specifying whether a transform needs >> access to some software license) that we use with our transforms and that's >> what was preventing the fusion I was expecting. I'm I'm looking into how to >> make these hints mergeable now. >> >> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote: >> >>> Building on what Robert Bradshaw has said, basically, if these fusion >>> breaks don't exist, the pipeline can live lock, because the side input is >>> unable to finish computing for a given input element's window. >>> >>> I have recently added fusion to the Go Prism runner based on the python >>> side input semantics, and i was surprised that there are basically two >>> rules for fusion. The side input one, and for handling Stateful processing. >>> >>> >>> This code here is the greedy fusion algorithm that Python uses, but a >>> less set based, so it might be easier to follow: >>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513 >>> >>> From the linked code comment: >>> >>> Side Inputs: A transform S consuming a PCollection as a side input can't >>> be fused with the transform P that produces that PCollection. Further, >>> no transform S+ descended from S, can be fused with transform P. >>> >>> Ideally I'll add visual representations of the graphs in the test suite >>> here, that validates the side input dependency logic: >>> >>> >>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398 >>> >>> (Note, that test doesn't validate expected fusion results, Prism is a >>> work in progress). >>> >>> >>> As for the Stateful rule, this is largely an implementation convenience >>> for runners to ensure correct execution. >>> If your pipeline also uses Stateful transforms, or SplittableDoFns, >>> those are usually relegated to the root of a fused stage, and avoids >>> fusions with each other. That can also cause additional stages. >>> >>> If Beam adopted a rigorous notion of Key Preserving for transforms, >>> multiple stateful transforms could be fused in the same stage. But that's a >>> very different discussion. >>> >>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Thanks for the explanation! >>>> >>>> That matches with my intuition - are there any other rules with side >>>> inputs? >>>> >>>> I might be misunderstanding the actual cause of the fusion breaks in >>>> our pipeline, but we essentially have one part of the graph that produces >>>> many small collections that are used as side inputs in the remaining part >>>> of the graph. In other words, the "main graph" is mostly linear but uses >>>> side inputs from the earlier part of the graph. >>>> >>>> Since the main graph is mostly linear, I expected few stages, but what >>>> I actually see are a lot of breaks around the side input requiring >>>> transforms. >>>> >>>> >>>> Tangentially, are there any general tips for understanding why a graph >>>> might be fused the way it was? >>>> >>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev < >>>> dev@beam.apache.org> wrote: >>>> >>>>> That is correct. Side inputs give a view of the "whole" PCollection >>>>> and hence introduce a fusion-producing barrier. For example, suppose one >>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are >>>>> consumed (as the main and side input respectively) of DoFnB. >>>>> >>>>> -------- mainPColl ----- DoFnB >>>>> / ^ >>>>> inPColl -- DoFnA | >>>>> \ | >>>>> -------- sidePColl ------- / >>>>> >>>>> >>>>> Now DoFnB may iterate over the entity of sidePColl for every element >>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which >>>>> would require DoFnB to consume the elements as they are produced from >>>>> DoFnA, but we need DoFnA to run to completion before we know the contents >>>>> of sidePColl. >>>>> >>>>> Similar constraints apply in larger graphs (e.g. there may be many >>>>> intermediate DoFns and PCollections), but they principally boil down to >>>>> shapes that look like this. >>>>> >>>>> Though this does not introduce a global barrier in streaming, there is >>>>> still the analogous per window/watermark barrier that prevents fusion for >>>>> the same reasons. >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Hey all, >>>>>> >>>>>> We have a pretty big pipeline and while I was inspecting the stages, >>>>>> I noticed there is less fusion than I expected. I suspect it has to do >>>>>> with >>>>>> the heavy use of side inputs in our workflow. In the python sdk, I see >>>>>> that >>>>>> side inputs are considered when determining whether two stages are >>>>>> fusible. >>>>>> I have a hard time getting a clear understanding of the logic though. >>>>>> Could >>>>>> someone clarify / summarize the rules around this? >>>>>> >>>>>> Thanks! >>>>>> Joey >>>>>> >>>>>