Yeah, we already have `ResourceHint.get_merged_value(cls, outer_value,
inner_value)` for reconciling resources within a composite, in the future
we could possibly just have another similar method and have the environment
merging logic hook into that.

On Fri, Dec 15, 2023 at 3:53 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:

> There is definitely a body of future work in intelligently merging
> compatible-but-not-equal environments. (Dataflow does this for example.)
> Defining/detecting compatibility is not always easy, but sometimes is, and
> we should at least cover those cases and grow them over time.
>
> On Fri, Dec 15, 2023 at 5:57 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Yeah I can confirm for the python runners (based on my reading of the
>> translations.py [1]) that only identical environments are merged together.
>>
>> The funny thing is that we _originally_ implemented this hint as an
>> annotation but then changed it to hint because it semantically felt more
>> correct. I think we might go back to that since the environment merging
>> logic isn't too flexible / easy to customize. Our type of hint is a bit
>> unlike other hints anyways. Unlike resources like MinRam, these resources
>> are additive (e.g. you can merge an environment that requires license A and
>> an environment that requires license B into an environment that requires
>> both A and B)
>>
>> [1]
>> https://github.com/apache/beam/blob/5fb4db31994d7c2c1e04d32a4b153bc83d739f36/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L4
>>
>> On Fri, Dec 15, 2023 at 8:43 AM Robert Burke <rob...@frantil.com> wrote:
>>
>>> That would do it. We got so tunnel visioned on side inputs we missed
>>> that!
>>>
>>> IIRC the python local runner and Prism both only fuse transforms in
>>> identical environments together. So any environmental diffs will prevent
>>> fusion.
>>>
>>> Runners as a rule are usually free to ignore/manage hints as they like.
>>> Transform annotations might be an alternative, but how those are managed
>>> would be more SDK specific.
>>>
>>> On Fri, Dec 15, 2023, 5:21 AM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> I figured out my issue. I thought side inputs were breaking up my
>>>> pipeline but after experimenting with my transforms I now realize what was
>>>> actually breaking it up was different transform environments that weren't
>>>> considered compatible.
>>>>
>>>> We have a custom resource hint (for specifying whether a transform
>>>> needs access to some software license) that we use with our transforms and
>>>> that's what was preventing the fusion I was expecting. I'm I'm looking into
>>>> how to make these hints mergeable now.
>>>>
>>>> On Thu, Dec 14, 2023 at 7:46 PM Robert Burke <rob...@frantil.com>
>>>> wrote:
>>>>
>>>>> Building on what Robert Bradshaw has said, basically, if these fusion
>>>>> breaks don't exist, the pipeline can live lock, because the side input is
>>>>> unable to finish computing for a given input element's window.
>>>>>
>>>>> I have recently added fusion to the Go Prism runner based on the
>>>>> python side input semantics, and i was surprised that there are basically
>>>>> two rules for fusion. The side input one, and for handling Stateful
>>>>> processing.
>>>>>
>>>>>
>>>>> This code here is the greedy fusion algorithm that Python uses, but a
>>>>> less set based, so it might be easier to follow:
>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess.go#L513
>>>>>
>>>>> From the linked code comment:
>>>>>
>>>>> Side Inputs: A transform S consuming a PCollection as a side input
>>>>> can't
>>>>>  be fused with the transform P that produces that PCollection. Further,
>>>>> no transform S+ descended from S, can be fused with transform P.
>>>>>
>>>>> Ideally I'll add visual representations of the graphs in the test
>>>>> suite here, that validates the side input dependency logic:
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go#L398
>>>>>
>>>>> (Note, that test doesn't validate expected fusion results, Prism is a
>>>>> work in progress).
>>>>>
>>>>>
>>>>> As for the Stateful rule, this is largely an implementation
>>>>> convenience for runners to ensure correct execution.
>>>>> If your pipeline also uses Stateful transforms, or SplittableDoFns,
>>>>> those are usually relegated to the root of a fused stage, and avoids
>>>>> fusions with each other. That can also cause additional stages.
>>>>>
>>>>> If Beam adopted a rigorous notion of Key Preserving for transforms,
>>>>> multiple stateful transforms could be fused in the same stage. But that's 
>>>>> a
>>>>> very different discussion.
>>>>>
>>>>> On Thu, Dec 14, 2023, 4:03 PM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the explanation!
>>>>>>
>>>>>> That matches with my intuition - are there any other rules with side
>>>>>> inputs?
>>>>>>
>>>>>> I might be misunderstanding the actual cause of the fusion breaks in
>>>>>> our pipeline, but we essentially have one part of the graph that produces
>>>>>> many small collections that are used as side inputs in the remaining part
>>>>>> of the graph. In other words, the "main graph" is mostly linear but uses
>>>>>> side inputs from the earlier part of the graph.
>>>>>>
>>>>>>  Since the main graph is mostly linear, I expected few stages, but
>>>>>> what I actually see are a lot of breaks around the side input requiring
>>>>>> transforms.
>>>>>>
>>>>>>
>>>>>> Tangentially, are there any general tips for understanding why a
>>>>>> graph might be fused the way it was?
>>>>>>
>>>>>> On Thu, Dec 14, 2023, 6:10 PM Robert Bradshaw via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> That is correct. Side inputs give a view of the "whole" PCollection
>>>>>>> and hence introduce a fusion-producing barrier. For example, suppose one
>>>>>>> has a DoFn that produces two outputs, mainPColl and sidePColl, that are
>>>>>>> consumed (as the main and side input respectively) of DoFnB.
>>>>>>>
>>>>>>>                   -------- mainPColl ----- DoFnB
>>>>>>>                 /                            ^
>>>>>>> inPColl -- DoFnA                             |
>>>>>>>                 \                            |
>>>>>>>                   -------- sidePColl ------- /
>>>>>>>
>>>>>>>
>>>>>>> Now DoFnB may iterate over the entity of sidePColl for every element
>>>>>>> of mainPColl. This means that DoFnA and DoFnB cannot be fused, which
>>>>>>> would require DoFnB to consume the elements as they are produced from
>>>>>>> DoFnA, but we need DoFnA to run to completion before we know the 
>>>>>>> contents
>>>>>>> of sidePColl.
>>>>>>>
>>>>>>> Similar constraints apply in larger graphs (e.g. there may be many
>>>>>>> intermediate DoFns and PCollections), but they principally boil down to
>>>>>>> shapes that look like this.
>>>>>>>
>>>>>>> Though this does not introduce a global barrier in streaming, there
>>>>>>> is still the analogous per window/watermark barrier that prevents fusion
>>>>>>> for the same reasons.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 14, 2023 at 3:02 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>>>>>>
>>>>>>>> We have a pretty big pipeline and while I was inspecting the
>>>>>>>> stages, I noticed there is less fusion than I expected. I suspect it 
>>>>>>>> has to
>>>>>>>> do with the heavy use of side inputs in our workflow. In the python 
>>>>>>>> sdk, I
>>>>>>>> see that side inputs are considered when determining whether two 
>>>>>>>> stages are
>>>>>>>> fusible. I have a hard time getting a clear understanding of the logic
>>>>>>>> though. Could someone clarify / summarize the rules around this?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Joey
>>>>>>>>
>>>>>>>

Reply via email to