Yeah I can confirm for the python runners (based on my reading of the
translations.py [1]) that only identical environments are merged together.

The funny thing is that we _originally_ implemented this hint as an
annotation but then changed it to hint because it semantically felt more
correct. I think we might go back to that since the environment merging
logic isn't too flexible / easy to customize. Our type of hint is a bit
unlike other hints anyways. Unlike resources like MinRam, these resources
are additive (e.g. you can merge an environment that requires license A and
an environment that requires license B into an environment that requires
both A and B)

[1]
https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4

On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote:

> That would do it. We got so tunnel visioned on side inputs we missed that!
>
> IIRC the python local runner and Prism both only fuse transforms in
> identical environments together. So any environmental diffs will prevent
> fusion.
>
> Runners as a rule are usually free to ignore/manage hints as they like.
> Transform annotations might be an alternative, but how those are managed
> would be more SDK specific.
>
> On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>
>> I figured out my issue. I thought side inputs were breaking up my
>> pipeline but after experimenting with my transforms I now realize what was
>> actually breaking it up was different transform environments that weren't
>> considered compatible.
>>
>> We have a custom resource hint (for specifying whether a transform needs
>> access to some software license) that we use with our transforms and that's
>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>> make these hints mergeable now.
>>
>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote:
>>
>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>> breaks don't exist, the pipeline can live lock, because the side input is
>>> unable to finish computing for a given input element's window.
>>>
>>> I have recently added fusion to the Go Prism runner based on the python
>>> side input semantics, and i was surprised that there are basically two
>>> rules for fusion. The side input one, and for handling Stateful processing.
>>>
>>>
>>> This code here is the greedy fusion algorithm that Python uses, but a
>>> less set based, so it might be easier to follow:
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>
>>> From the linked code comment:
>>>
>>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>>  be fused with the transform P that produces that PCollection. Further,
>>> no transform S+ descended from S, can be fused with transform P.
>>>
>>> Ideally I'll add visual representations of the graphs in the test suite
>>> here, that validates the side input dependency logic:
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>
>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>> work in progress).
>>>
>>>
>>> As for the Stateful rule, this is largely an implementation convenience
>>> for runners to ensure correct execution.
>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>> those are usually relegated to the root of a fused stage, and avoids
>>> fusions with each other. That can also cause additional stages.
>>>
>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>> multiple stateful transforms could be fused in the same stage. But that's a
>>> very different discussion.
>>>
>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Thanks for the explanation!
>>>>
>>>> That matches with my intuition - are there any other rules with side
>>>> inputs?
>>>>
>>>> I might be misunderstanding the actual cause of the fusion breaks in
>>>> our pipeline, but we essentially have one part of the graph that produces
>>>> many small collections that are used as side inputs in the remaining part
>>>> of the graph. In other words, the "main graph" is mostly linear but uses
>>>> side inputs from the earlier part of the graph.
>>>>
>>>>  Since the main graph is mostly linear, I expected few stages, but what
>>>> I actually see are a lot of breaks around the side input requiring
>>>> transforms.
>>>>
>>>>
>>>> Tangentially, are there any general tips for understanding why a graph
>>>> might be fused the way it was?
>>>>
>>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> That is correct. Side inputs give a view of the "whole" PCollection
>>>>> and hence introduce a fusion-producing barrier. For example, suppose one
>>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are
>>>>> consumed (as the main and side input respectively) of DoFnB.
>>>>>
>>>>>                   -------- mainPColl ----- DoFnB
>>>>>                 /                            ^
>>>>> inPColl -- DoFnA                             |
>>>>>                 \                            |
>>>>>                   -------- sidePColl ------- /
>>>>>
>>>>>
>>>>> Now DoFnB may iterate over the entity of sidePColl for every element
>>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>>>>> would require DoFnB to consume the elements as they are produced from
>>>>> DoFnA, but we need DoFnA to run to completion before we know the contents
>>>>> of sidePColl.
>>>>>
>>>>> Similar constraints apply in larger graphs (e.g. there may be many
>>>>> intermediate DoFns and PCollections), but they principally boil down to
>>>>> shapes that look like this.
>>>>>
>>>>> Though this does not introduce a global barrier in streaming, there is
>>>>> still the analogous per window/watermark barrier that prevents fusion for
>>>>> the same reasons.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> We have a pretty big pipeline and while I was inspecting the stages,
>>>>>> I noticed there is less fusion than I expected. I suspect it has to do 
>>>>>> with
>>>>>> the heavy use of side inputs in our workflow. In the python sdk, I see 
>>>>>> that
>>>>>> side inputs are considered when determining whether two stages are 
>>>>>> fusible.
>>>>>> I have a hard time getting a clear understanding of the logic though. 
>>>>>> Could
>>>>>> someone clarify / summarize the rules around this?
>>>>>>
>>>>>> Thanks!
>>>>>> Joey
>>>>>>
>>>>>

Reply via email to