Building on what Robert Bradshaw has said, basically, if these fusion breaks don't exist, the pipeline can live lock, because the side input is unable to finish computing for a given input element's window.
I have recently added fusion to the Go Prism runner based on the python side input semantics, and i was surprised that there are basically two rules for fusion. The side input one, and for handling Stateful processing. This code here is the greedy fusion algorithm that Python uses, but a less set based, so it might be easier to follow: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513 >From the linked code comment: Side Inputs: A transform S consuming a PCollection as a side input can't be fused with the transform P that produces that PCollection. Further, no transform S+ descended from S, can be fused with transform P. Ideally I'll add visual representations of the graphs in the test suite here, that validates the side input dependency logic: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398 (Note, that test doesn't validate expected fusion results, Prism is a work in progress). As for the Stateful rule, this is largely an implementation convenience for runners to ensure correct execution. If your pipeline also uses Stateful transforms, or SplittableDoFns, those are usually relegated to the root of a fused stage, and avoids fusions with each other. That can also cause additional stages. If Beam adopted a rigorous notion of Key Preserving for transforms, multiple stateful transforms could be fused in the same stage. But that's a very different discussion. On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> wrote: > Thanks for the explanation! > > That matches with my intuition - are there any other rules with side > inputs? > > I might be misunderstanding the actual cause of the fusion breaks in our > pipeline, but we essentially have one part of the graph that produces many > small collections that are used as side inputs in the remaining part of the > graph. In other words, the "main graph" is mostly linear but uses side > inputs from the earlier part of the graph. > > Since the main graph is mostly linear, I expected few stages, but what I > actually see are a lot of breaks around the side input requiring transforms. > > > Tangentially, are there any general tips for understanding why a graph > might be fused the way it was? > > On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <dev@beam.apache.org> > wrote: > >> That is correct. Side inputs give a view of the "whole" PCollection and >> hence introduce a fusion-producing barrier. For example, suppose one has a >> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed >> (as the main and side input respectively) of DoFnB. >> >> -------- mainPColl ----- DoFnB >> / ^ >> inPColl -- DoFnA | >> \ | >> -------- sidePColl ------- / >> >> >> Now DoFnB may iterate over the entity of sidePColl for every element of >> mainPColl. This means that DoFnA and DoFnB cannot be fused, which >> would require DoFnB to consume the elements as they are produced from >> DoFnA, but we need DoFnA to run to completion before we know the contents >> of sidePColl. >> >> Similar constraints apply in larger graphs (e.g. there may be many >> intermediate DoFns and PCollections), but they principally boil down to >> shapes that look like this. >> >> Though this does not introduce a global barrier in streaming, there is >> still the analogous per window/watermark barrier that prevents fusion for >> the same reasons. >> >> >> >> >> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> Hey all, >>> >>> We have a pretty big pipeline and while I was inspecting the stages, I >>> noticed there is less fusion than I expected. I suspect it has to do with >>> the heavy use of side inputs in our workflow. In the python sdk, I see that >>> side inputs are considered when determining whether two stages are fusible. >>> I have a hard time getting a clear understanding of the logic though. Could >>> someone clarify / summarize the rules around this? >>> >>> Thanks! >>> Joey >>> >>