I figured out my issue. I thought side inputs were breaking up my pipeline but after experimenting with my transforms I now realize what was actually breaking it up was different transform environments that weren't considered compatible.
We have a custom resource hint (for specifying whether a transform needs access to some software license) that we use with our transforms and that's what was preventing the fusion I was expecting. I'm I'm looking into how to make these hints mergeable now. On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote: > Building on what Robert Bradshaw has said, basically, if these fusion > breaks don't exist, the pipeline can live lock, because the side input is > unable to finish computing for a given input element's window. > > I have recently added fusion to the Go Prism runner based on the python > side input semantics, and i was surprised that there are basically two > rules for fusion. The side input one, and for handling Stateful processing. > > > This code here is the greedy fusion algorithm that Python uses, but a less > set based, so it might be easier to follow: > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513 > > From the linked code comment: > > Side Inputs: A transform S consuming a PCollection as a side input can't > be fused with the transform P that produces that PCollection. Further, > no transform S+ descended from S, can be fused with transform P. > > Ideally I'll add visual representations of the graphs in the test suite > here, that validates the side input dependency logic: > > > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398 > > (Note, that test doesn't validate expected fusion results, Prism is a work > in progress). > > > As for the Stateful rule, this is largely an implementation convenience > for runners to ensure correct execution. > If your pipeline also uses Stateful transforms, or SplittableDoFns, those > are usually relegated to the root of a fused stage, and avoids fusions with > each other. That can also cause additional stages. > > If Beam adopted a rigorous notion of Key Preserving for transforms, > multiple stateful transforms could be fused in the same stage. But that's a > very different discussion. > > On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> wrote: > >> Thanks for the explanation! >> >> That matches with my intuition - are there any other rules with side >> inputs? >> >> I might be misunderstanding the actual cause of the fusion breaks in our >> pipeline, but we essentially have one part of the graph that produces many >> small collections that are used as side inputs in the remaining part of the >> graph. In other words, the "main graph" is mostly linear but uses side >> inputs from the earlier part of the graph. >> >> Since the main graph is mostly linear, I expected few stages, but what I >> actually see are a lot of breaks around the side input requiring transforms. >> >> >> Tangentially, are there any general tips for understanding why a graph >> might be fused the way it was? >> >> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev < >> dev@beam.apache.org> wrote: >> >>> That is correct. Side inputs give a view of the "whole" PCollection and >>> hence introduce a fusion-producing barrier. For example, suppose one has a >>> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed >>> (as the main and side input respectively) of DoFnB. >>> >>> -------- mainPColl ----- DoFnB >>> / ^ >>> inPColl -- DoFnA | >>> \ | >>> -------- sidePColl ------- / >>> >>> >>> Now DoFnB may iterate over the entity of sidePColl for every element of >>> mainPColl. This means that DoFnA and DoFnB cannot be fused, which >>> would require DoFnB to consume the elements as they are produced from >>> DoFnA, but we need DoFnA to run to completion before we know the contents >>> of sidePColl. >>> >>> Similar constraints apply in larger graphs (e.g. there may be many >>> intermediate DoFns and PCollections), but they principally boil down to >>> shapes that look like this. >>> >>> Though this does not introduce a global barrier in streaming, there is >>> still the analogous per window/watermark barrier that prevents fusion for >>> the same reasons. >>> >>> >>> >>> >>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Hey all, >>>> >>>> We have a pretty big pipeline and while I was inspecting the stages, I >>>> noticed there is less fusion than I expected. I suspect it has to do with >>>> the heavy use of side inputs in our workflow. In the python sdk, I see that >>>> side inputs are considered when determining whether two stages are fusible. >>>> I have a hard time getting a clear understanding of the logic though. Could >>>> someone clarify / summarize the rules around this? >>>> >>>> Thanks! >>>> Joey >>>> >>>