I figured out my issue. I thought side inputs were breaking up my pipeline
but after experimenting with my transforms I now realize what was actually
breaking it up was different transform environments that weren't considered
compatible.

We have a custom resource hint (for specifying whether a transform needs
access to some software license) that we use with our transforms and that's
what was preventing the fusion I was expecting. I'm I'm looking into how to
make these hints mergeable now.

On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote:

> Building on what Robert Bradshaw has said, basically, if these fusion
> breaks don't exist, the pipeline can live lock, because the side input is
> unable to finish computing for a given input element's window.
>
> I have recently added fusion to the Go Prism runner based on the python
> side input semantics, and i was surprised that there are basically two
> rules for fusion. The side input one, and for handling Stateful processing.
>
>
> This code here is the greedy fusion algorithm that Python uses, but a less
> set based, so it might be easier to follow:
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>
> From the linked code comment:
>
> Side Inputs: A transform S consuming a PCollection as a side input can't
>  be fused with the transform P that produces that PCollection. Further,
> no transform S+ descended from S, can be fused with transform P.
>
> Ideally I'll add visual representations of the graphs in the test suite
> here, that validates the side input dependency logic:
>
>
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>
> (Note, that test doesn't validate expected fusion results, Prism is a work
> in progress).
>
>
> As for the Stateful rule, this is largely an implementation convenience
> for runners to ensure correct execution.
> If your pipeline also uses Stateful transforms, or SplittableDoFns, those
> are usually relegated to the root of a fused stage, and avoids fusions with
> each other. That can also cause additional stages.
>
> If Beam adopted a rigorous notion of Key Preserving for transforms,
> multiple stateful transforms could be fused in the same stage. But that's a
> very different discussion.
>
> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com> wrote:
>
>> Thanks for the explanation!
>>
>> That matches with my intuition - are there any other rules with side
>> inputs?
>>
>> I might be misunderstanding the actual cause of the fusion breaks in our
>> pipeline, but we essentially have one part of the graph that produces many
>> small collections that are used as side inputs in the remaining part of the
>> graph. In other words, the "main graph" is mostly linear but uses side
>> inputs from the earlier part of the graph.
>>
>>  Since the main graph is mostly linear, I expected few stages, but what I
>> actually see are a lot of breaks around the side input requiring transforms.
>>
>>
>> Tangentially, are there any general tips for understanding why a graph
>> might be fused the way it was?
>>
>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> That is correct. Side inputs give a view of the "whole" PCollection and
>>> hence introduce a fusion-producing barrier. For example, suppose one has a
>>> DoFn that produces two outputs, mainPColl and sidePColl, that are consumed
>>> (as the main and side input respectively) of DoFnB.
>>>
>>>                   -------- mainPColl ----- DoFnB
>>>                 /                            ^
>>> inPColl -- DoFnA                             |
>>>                 \                            |
>>>                   -------- sidePColl ------- /
>>>
>>>
>>> Now DoFnB may iterate over the entity of sidePColl for every element of
>>> mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>>> would require DoFnB to consume the elements as they are produced from
>>> DoFnA, but we need DoFnA to run to completion before we know the contents
>>> of sidePColl.
>>>
>>> Similar constraints apply in larger graphs (e.g. there may be many
>>> intermediate DoFns and PCollections), but they principally boil down to
>>> shapes that look like this.
>>>
>>> Though this does not introduce a global barrier in streaming, there is
>>> still the analogous per window/watermark barrier that prevents fusion for
>>> the same reasons.
>>>
>>>
>>>
>>>
>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> We have a pretty big pipeline and while I was inspecting the stages, I
>>>> noticed there is less fusion than I expected. I suspect it has to do with
>>>> the heavy use of side inputs in our workflow. In the python sdk, I see that
>>>> side inputs are considered when determining whether two stages are fusible.
>>>> I have a hard time getting a clear understanding of the logic though. Could
>>>> someone clarify / summarize the rules around this?
>>>>
>>>> Thanks!
>>>> Joey
>>>>
>>>

Reply via email to