On Mon, Jan 27, 2025 at 1:46 PM Robert Burke <rob...@frantil.com> wrote:
>
> Chiming in for the Prism Runner implementation.
>
> I mostly didn't implement this in Prism because I had reached my limit in 
> dealing with the Graph at the time.
>
> The Python SDK approach to the optimizations is very set theory based, which 
> isn't as natural (to me at least) for handling the flatten unzipping.
>
> Not impossible, just not for me ;).
>
> On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com> wrote:
>>>
>>> > I think I get the general gist in that you dont necessarily need to 
>>> > combine the input pcollections to a flatten and instead you can just 
>>> > apply non-aggregating consuming transforms to all input pcollections, but 
>>> > when is a good time to do that? Do runners that implement this 
>>> > optimization always apply this to all flattens?
>>
>>
>>> Pretty much whenever they can, though there are limitations (e.g. if
>>> DoFnC is stateful). I think it depends on the internal implementation
>>> of the runner whether this makes sense.
>>
>>
>> Is that why it's not implemented in `translations.py`? Since it doesn't make 
>> sense for all runners?

It's mostly not implemented in translations.py because I (and everyone
else) only has finite time, and the Python runner was not intended to
be a high-performance runner at the end of the day anyway.

Not implementing fusion makes things extraordinarily slow (and also
very expensive memory-wise, unless you have sophisticated gc tracking
and the original intent was to be at least somewhat of a reference
implementation (though it went through a couple of refactorings that
largely destroyed that benefit)). It also does enough optimization to
actually exercise the worker code (for development purposes, and
cross-language usefulness) and then gained a couple of extra features
that were useful for all runners (e.g. combiner packing).

Flatten unzipping can get surprisingly messy (ironically, Flatten is
one of the "simplest" transforms execution-wize but hardest to deal
with in an optimizer) and just materializing was an easy, obviously
correct solution that didn't impact performance too much (in context)
so it's never been added. Probably still be good to do someday, as it
unlocks fusion opportunities and such.

- Robert


>> If you submit a workflow through the python SDK to Dataflow, does dataflow 
>> do another round of optimizations? Or is the translation.py optimization 
>> phases never used when using the python SDK with the Dataflow runner?
>>
>> On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev 
>> <dev@beam.apache.org> wrote:
>>>
>>> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> wrote:
>>> >
>>> > I heard mention that there is a flatten unzipping optimization 
>>> > implemented by some runners. I didn't see that in the python 
>>> > optimizations in translations.py[1]. Just curious what this optimization 
>>> > is?
>>>
>>> It's done to increase the possibility of fusion. Suppose one has
>>>
>>> ... --> DoFnA \
>>>              --> Flatten --> DoFnC -> ...
>>> ... --> DoFnB /
>>>
>>> When determining the physical execution plan, one can re-write this as
>>>
>>> ... --> DoFnA --> DoFnC \
>>>                               --> Flatten --> ...
>>> ... --> DoFnB --> DoFnC /
>>>
>>> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC).
>>>
>>> One can progressively do this up to the point that the consumer of the
>>> flatten already requires materialization that permits multiple inputs
>>> (e.g. writing to a shuffle/grouping operation).
>>>
>>> > I think I get the general gist in that you dont necessarily need to 
>>> > combine the input pcollections to a flatten and instead you can just 
>>> > apply non-aggregating consuming transforms to all input pcollections, but 
>>> > when is a good time to do that? Do runners that implement this 
>>> > optimization always apply this to all flattens?
>>>
>>> Pretty much whenever they can, though there are limitations (e.g. if
>>> DoFnC is stateful). I think it depends on the internal implementation
>>> of the runner whether this makes sense.
>>>
>>> > Cheers,
>>> > Joey
>>> >
>>> > [1] 
>>> > https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py

Reply via email to