>> Thanks for the explanation and history! >> >> Are there any other major optimizations not included in translations.py?
> Dataflow's fusion algorithm is more sophisticated (the one in Beam > Python is just greedy which can give sub-optimal solution sometimes > and doesn't take into account the size of the data being > materialized). whoa! If the optimization takes into account the size of the data being materialized, does that mean it happens at runtime? On Mon, Jan 27, 2025 at 5:47 PM Robert Bradshaw <rober...@google.com> wrote: > On Mon, Jan 27, 2025 at 2:29 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > > > > Ah I see, so some optimizations were implemented because they were > necessary for any kind of reasonable performance when running with the > local python runners, but those translations aren't actually used by the > large scale runners like Dataflow. I see now that the python SDK > DataflowRunner just submits a pre-optimized pipeline proto to dataflow, so > then dataflow presumably does its own optimizations. I wasn't sure if the > `translations.py` optimizations were used by all runners submitted through > the python SDK. > > IIRC Spark and Flink borrow(ed?) more from translations.py as the > stage fuser on the portable Flink side wasn't as sophisticated (e.g. > it made any DoFn with a side input a fusion break rather than figure > out if the upstream was a producer of the side input). Even Dataflow > leverages the combiner packing on the SDK side, but, yes, does it's > own optimization with roots (though there have been multiple rewrites) > harking back to the original FlumeJava paper of 2010. > > It's a bit of a conundrum, because if one has multiple SDKs on one > runner, one wants to put the optimizations in the runner, but if one > has multiple runners one is incentivized to put the optimization in > the SDK. Unless you invoke a third component to do all of this (e.g. > the Typescript SDK deferred to the Python SDK to invoke Dataflow). > > > Thanks for the explanation and history! > > > > Are there any other major optimizations not included in translations.py? > > Dataflow's fusion algorithm is more sophisticated (the one in Beam > Python is just greedy which can give sub-optimal solution sometimes > and doesn't take into account the size of the data being > materialized). Dataflow also handles things like "right fitting" where > stages are broken up to meet varying resource requirements (e.g. > accelerator-requiring vs. not) and there are also optimizations > specific to its shuffle implementation, and for Flume (which uses the > same optimizer) there additional considerations like > geographically-dispersed data and jobs that we don't have to deal with > for Dataflow. > > Nothing beats out the importance of fusion though, and I think > translations.py has pretty complete coverage of the expression of > higher-level primitives into their executable components (that are > then executed by worker code) which isn't "optimization" per se but an > important part of the Beam model. > > > On Mon, Jan 27, 2025 at 5:05 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> > >> On Mon, Jan 27, 2025 at 1:46 PM Robert Burke <rob...@frantil.com> > wrote: > >> > > >> > Chiming in for the Prism Runner implementation. > >> > > >> > I mostly didn't implement this in Prism because I had reached my > limit in dealing with the Graph at the time. > >> > > >> > The Python SDK approach to the optimizations is very set theory > based, which isn't as natural (to me at least) for handling the flatten > unzipping. > >> > > >> > Not impossible, just not for me ;). > >> > > >> > On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> >>> > >> >>> > I think I get the general gist in that you dont necessarily need > to combine the input pcollections to a flatten and instead you can just > apply non-aggregating consuming transforms to all input pcollections, but > when is a good time to do that? Do runners that implement this optimization > always apply this to all flattens? > >> >> > >> >> > >> >>> Pretty much whenever they can, though there are limitations (e.g. if > >> >>> DoFnC is stateful). I think it depends on the internal > implementation > >> >>> of the runner whether this makes sense. > >> >> > >> >> > >> >> Is that why it's not implemented in `translations.py`? Since it > doesn't make sense for all runners? > >> > >> It's mostly not implemented in translations.py because I (and everyone > >> else) only has finite time, and the Python runner was not intended to > >> be a high-performance runner at the end of the day anyway. > >> > >> Not implementing fusion makes things extraordinarily slow (and also > >> very expensive memory-wise, unless you have sophisticated gc tracking > >> and the original intent was to be at least somewhat of a reference > >> implementation (though it went through a couple of refactorings that > >> largely destroyed that benefit)). It also does enough optimization to > >> actually exercise the worker code (for development purposes, and > >> cross-language usefulness) and then gained a couple of extra features > >> that were useful for all runners (e.g. combiner packing). > >> > >> Flatten unzipping can get surprisingly messy (ironically, Flatten is > >> one of the "simplest" transforms execution-wize but hardest to deal > >> with in an optimizer) and just materializing was an easy, obviously > >> correct solution that didn't impact performance too much (in context) > >> so it's never been added. Probably still be good to do someday, as it > >> unlocks fusion opportunities and such. > >> > >> - Robert > >> > >> > >> >> If you submit a workflow through the python SDK to Dataflow, does > dataflow do another round of optimizations? Or is the translation.py > optimization phases never used when using the python SDK with the Dataflow > runner? > >> >> > >> >> On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> >>> > >> >>> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran < > joey.t...@schrodinger.com> wrote: > >> >>> > > >> >>> > I heard mention that there is a flatten unzipping optimization > implemented by some runners. I didn't see that in the python optimizations > in translations.py[1]. Just curious what this optimization is? > >> >>> > >> >>> It's done to increase the possibility of fusion. Suppose one has > >> >>> > >> >>> ... --> DoFnA \ > >> >>> --> Flatten --> DoFnC -> ... > >> >>> ... --> DoFnB / > >> >>> > >> >>> When determining the physical execution plan, one can re-write this > as > >> >>> > >> >>> ... --> DoFnA --> DoFnC \ > >> >>> --> Flatten --> ... > >> >>> ... --> DoFnB --> DoFnC / > >> >>> > >> >>> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC). > >> >>> > >> >>> One can progressively do this up to the point that the consumer of > the > >> >>> flatten already requires materialization that permits multiple > inputs > >> >>> (e.g. writing to a shuffle/grouping operation). > >> >>> > >> >>> > I think I get the general gist in that you dont necessarily need > to combine the input pcollections to a flatten and instead you can just > apply non-aggregating consuming transforms to all input pcollections, but > when is a good time to do that? Do runners that implement this optimization > always apply this to all flattens? > >> >>> > >> >>> Pretty much whenever they can, though there are limitations (e.g. if > >> >>> DoFnC is stateful). I think it depends on the internal > implementation > >> >>> of the runner whether this makes sense. > >> >>> > >> >>> > Cheers, > >> >>> > Joey > >> >>> > > >> >>> > [1] > https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py >