There is definitely a body of future work in intelligently merging
compatible-but-not-equal environments. (Dataflow does this for example.)
Defining/detecting compatibility is not always easy, but sometimes is, and
we should at least cover those cases and grow them over time.

On Fri, Dec 15, 2023 at 5:57 AM Joey Tran <joey.t...@schrodinger.com> wrote:

> Yeah I can confirm for the python runners (based on my reading of the
> translations.py [1]) that only identical environments are merged together.
>
> The funny thing is that we _originally_ implemented this hint as an
> annotation but then changed it to hint because it semantically felt more
> correct. I think we might go back to that since the environment merging
> logic isn't too flexible / easy to customize. Our type of hint is a bit
> unlike other hints anyways. Unlike resources like MinRam, these resources
> are additive (e.g. you can merge an environment that requires license A and
> an environment that requires license B into an environment that requires
> both A and B)
>
> [1]
> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>
> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote:
>
>> That would do it. We got so tunnel visioned on side inputs we missed that!
>>
>> IIRC the python local runner and Prism both only fuse transforms in
>> identical environments together. So any environmental diffs will prevent
>> fusion.
>>
>> Runners as a rule are usually free to ignore/manage hints as they like.
>> Transform annotations might be an alternative, but how those are managed
>> would be more SDK specific.
>>
>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com>
>> wrote:
>>
>>> I figured out my issue. I thought side inputs were breaking up my
>>> pipeline but after experimenting with my transforms I now realize what was
>>> actually breaking it up was different transform environments that weren't
>>> considered compatible.
>>>
>>> We have a custom resource hint (for specifying whether a transform needs
>>> access to some software license) that we use with our transforms and that's
>>> what was preventing the fusion I was expecting. I'm I'm looking into how to
>>> make these hints mergeable now.
>>>
>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com> wrote:
>>>
>>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>>> breaks don't exist, the pipeline can live lock, because the side input is
>>>> unable to finish computing for a given input element's window.
>>>>
>>>> I have recently added fusion to the Go Prism runner based on the python
>>>> side input semantics, and i was surprised that there are basically two
>>>> rules for fusion. The side input one, and for handling Stateful processing.
>>>>
>>>>
>>>> This code here is the greedy fusion algorithm that Python uses, but a
>>>> less set based, so it might be easier to follow:
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>>
>>>> From the linked code comment:
>>>>
>>>> Side Inputs: A transform S consuming a PCollection as a side input can't
>>>>  be fused with the transform P that produces that PCollection. Further,
>>>> no transform S+ descended from S, can be fused with transform P.
>>>>
>>>> Ideally I'll add visual representations of the graphs in the test suite
>>>> here, that validates the side input dependency logic:
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>>
>>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>>> work in progress).
>>>>
>>>>
>>>> As for the Stateful rule, this is largely an implementation convenience
>>>> for runners to ensure correct execution.
>>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>>> those are usually relegated to the root of a fused stage, and avoids
>>>> fusions with each other. That can also cause additional stages.
>>>>
>>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>>> multiple stateful transforms could be fused in the same stage. But that's a
>>>> very different discussion.
>>>>
>>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Thanks for the explanation!
>>>>>
>>>>> That matches with my intuition - are there any other rules with side
>>>>> inputs?
>>>>>
>>>>> I might be misunderstanding the actual cause of the fusion breaks in
>>>>> our pipeline, but we essentially have one part of the graph that produces
>>>>> many small collections that are used as side inputs in the remaining part
>>>>> of the graph. In other words, the "main graph" is mostly linear but uses
>>>>> side inputs from the earlier part of the graph.
>>>>>
>>>>>  Since the main graph is mostly linear, I expected few stages, but
>>>>> what I actually see are a lot of breaks around the side input requiring
>>>>> transforms.
>>>>>
>>>>>
>>>>> Tangentially, are there any general tips for understanding why a graph
>>>>> might be fused the way it was?
>>>>>
>>>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> That is correct. Side inputs give a view of the "whole" PCollection
>>>>>> and hence introduce a fusion-producing barrier. For example, suppose one
>>>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are
>>>>>> consumed (as the main and side input respectively) of DoFnB.
>>>>>>
>>>>>>                   -------- mainPColl ----- DoFnB
>>>>>>                 /                            ^
>>>>>> inPColl -- DoFnA                             |
>>>>>>                 \                            |
>>>>>>                   -------- sidePColl ------- /
>>>>>>
>>>>>>
>>>>>> Now DoFnB may iterate over the entity of sidePColl for every element
>>>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>>>>>> would require DoFnB to consume the elements as they are produced from
>>>>>> DoFnA, but we need DoFnA to run to completion before we know the contents
>>>>>> of sidePColl.
>>>>>>
>>>>>> Similar constraints apply in larger graphs (e.g. there may be many
>>>>>> intermediate DoFns and PCollections), but they principally boil down to
>>>>>> shapes that look like this.
>>>>>>
>>>>>> Though this does not introduce a global barrier in streaming, there
>>>>>> is still the analogous per window/watermark barrier that prevents fusion
>>>>>> for the same reasons.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>> We have a pretty big pipeline and while I was inspecting the stages,
>>>>>>> I noticed there is less fusion than I expected. I suspect it has to do 
>>>>>>> with
>>>>>>> the heavy use of side inputs in our workflow. In the python sdk, I see 
>>>>>>> that
>>>>>>> side inputs are considered when determining whether two stages are 
>>>>>>> fusible.
>>>>>>> I have a hard time getting a clear understanding of the logic though. 
>>>>>>> Could
>>>>>>> someone clarify / summarize the rules around this?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Joey
>>>>>>>
>>>>>>

Reply via email to