>> Thanks for the explanation and history!
>>
>> Are there any other major optimizations not included in translations.py?

> Dataflow's fusion algorithm is more sophisticated (the one in Beam
> Python is just greedy which can give sub-optimal solution sometimes
> and doesn't take into account the size of the data being
> materialized).
whoa! If the optimization takes into account the size of the data being
materialized, does that mean it happens at runtime?

On Mon, Jan 27, 2025 at 5:47 PM Robert Bradshaw <rober...@google.com> wrote:

> On Mon, Jan 27, 2025 at 2:29 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
> >
> > Ah I see, so some optimizations were implemented because they were
> necessary for any kind of reasonable performance when running with the
> local python runners, but those translations aren't actually used by the
> large scale runners like Dataflow. I see now that the python SDK
> DataflowRunner just submits a pre-optimized pipeline proto to dataflow, so
> then dataflow presumably does its own optimizations. I wasn't sure if the
> `translations.py` optimizations were used by all runners submitted through
> the python SDK.
>
> IIRC Spark and Flink borrow(ed?) more from translations.py as the
> stage fuser on the portable Flink side wasn't as sophisticated (e.g.
> it made any DoFn with a side input a fusion break rather than figure
> out if the upstream was a producer of the side input). Even Dataflow
> leverages the combiner packing on the SDK side, but, yes, does it's
> own optimization with roots (though there have been multiple rewrites)
> harking back to the original FlumeJava paper of 2010.
>
> It's a bit of a conundrum, because if one has multiple SDKs on one
> runner, one wants to put the optimizations in the runner, but if one
> has multiple runners one is incentivized to put the optimization in
> the SDK. Unless you invoke a third component to do all of this (e.g.
> the Typescript SDK deferred to the Python SDK to invoke Dataflow).
>
> > Thanks for the explanation and history!
> >
> > Are there any other major optimizations not included in translations.py?
>
> Dataflow's fusion algorithm is more sophisticated (the one in Beam
> Python is just greedy which can give sub-optimal solution sometimes
> and doesn't take into account the size of the data being
> materialized). Dataflow also handles things like "right fitting" where
> stages are broken up to meet varying resource requirements (e.g.
> accelerator-requiring vs. not) and there are also optimizations
> specific to its shuffle implementation, and for Flume (which uses the
> same optimizer) there additional considerations like
> geographically-dispersed data and jobs that we don't have to deal with
> for Dataflow.
>
> Nothing beats out the importance of fusion though, and I think
> translations.py has pretty complete coverage of the expression of
> higher-level primitives into their executable components (that are
> then executed by worker code) which isn't "optimization" per se but an
> important part of the Beam model.
>
> > On Mon, Jan 27, 2025 at 5:05 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> On Mon, Jan 27, 2025 at 1:46 PM Robert Burke <rob...@frantil.com>
> wrote:
> >> >
> >> > Chiming in for the Prism Runner implementation.
> >> >
> >> > I mostly didn't implement this in Prism because I had reached my
> limit in dealing with the Graph at the time.
> >> >
> >> > The Python SDK approach to the optimizations is very set theory
> based, which isn't as natural (to me at least) for handling the flatten
> unzipping.
> >> >
> >> > Not impossible, just not for me ;).
> >> >
> >> > On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
> >> >>>
> >> >>> > I think I get the general gist in that you dont necessarily need
> to combine the input pcollections to a flatten and instead you can just
> apply non-aggregating consuming transforms to all input pcollections, but
> when is a good time to do that? Do runners that implement this optimization
> always apply this to all flattens?
> >> >>
> >> >>
> >> >>> Pretty much whenever they can, though there are limitations (e.g. if
> >> >>> DoFnC is stateful). I think it depends on the internal
> implementation
> >> >>> of the runner whether this makes sense.
> >> >>
> >> >>
> >> >> Is that why it's not implemented in `translations.py`? Since it
> doesn't make sense for all runners?
> >>
> >> It's mostly not implemented in translations.py because I (and everyone
> >> else) only has finite time, and the Python runner was not intended to
> >> be a high-performance runner at the end of the day anyway.
> >>
> >> Not implementing fusion makes things extraordinarily slow (and also
> >> very expensive memory-wise, unless you have sophisticated gc tracking
> >> and the original intent was to be at least somewhat of a reference
> >> implementation (though it went through a couple of refactorings that
> >> largely destroyed that benefit)). It also does enough optimization to
> >> actually exercise the worker code (for development purposes, and
> >> cross-language usefulness) and then gained a couple of extra features
> >> that were useful for all runners (e.g. combiner packing).
> >>
> >> Flatten unzipping can get surprisingly messy (ironically, Flatten is
> >> one of the "simplest" transforms execution-wize but hardest to deal
> >> with in an optimizer) and just materializing was an easy, obviously
> >> correct solution that didn't impact performance too much (in context)
> >> so it's never been added. Probably still be good to do someday, as it
> >> unlocks fusion opportunities and such.
> >>
> >> - Robert
> >>
> >>
> >> >> If you submit a workflow through the python SDK to Dataflow, does
> dataflow do another round of optimizations? Or is the translation.py
> optimization phases never used when using the python SDK with the Dataflow
> runner?
> >> >>
> >> >> On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >> >>>
> >> >>> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <
> joey.t...@schrodinger.com> wrote:
> >> >>> >
> >> >>> > I heard mention that there is a flatten unzipping optimization
> implemented by some runners. I didn't see that in the python optimizations
> in translations.py[1]. Just curious what this optimization is?
> >> >>>
> >> >>> It's done to increase the possibility of fusion. Suppose one has
> >> >>>
> >> >>> ... --> DoFnA \
> >> >>>              --> Flatten --> DoFnC -> ...
> >> >>> ... --> DoFnB /
> >> >>>
> >> >>> When determining the physical execution plan, one can re-write this
> as
> >> >>>
> >> >>> ... --> DoFnA --> DoFnC \
> >> >>>                               --> Flatten --> ...
> >> >>> ... --> DoFnB --> DoFnC /
> >> >>>
> >> >>> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC).
> >> >>>
> >> >>> One can progressively do this up to the point that the consumer of
> the
> >> >>> flatten already requires materialization that permits multiple
> inputs
> >> >>> (e.g. writing to a shuffle/grouping operation).
> >> >>>
> >> >>> > I think I get the general gist in that you dont necessarily need
> to combine the input pcollections to a flatten and instead you can just
> apply non-aggregating consuming transforms to all input pcollections, but
> when is a good time to do that? Do runners that implement this optimization
> always apply this to all flattens?
> >> >>>
> >> >>> Pretty much whenever they can, though there are limitations (e.g. if
> >> >>> DoFnC is stateful). I think it depends on the internal
> implementation
> >> >>> of the runner whether this makes sense.
> >> >>>
> >> >>> > Cheers,
> >> >>> > Joey
> >> >>> >
> >> >>> > [1]
> https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
>

Reply via email to