Ah I see, so some optimizations were implemented because they were necessary for any kind of reasonable performance when running with the local python runners, but those translations aren't actually used by the large scale runners like Dataflow. I see now that the python SDK DataflowRunner just submits a pre-optimized pipeline proto to dataflow, so then dataflow presumably does its own optimizations. I wasn't sure if the `translations.py` optimizations were used by all runners submitted through the python SDK. Thanks for the explanation and history!
Are there any other major optimizations not included in translations.py? On Mon, Jan 27, 2025 at 5:05 PM Robert Bradshaw via dev <dev@beam.apache.org> wrote: > On Mon, Jan 27, 2025 at 1:46 PM Robert Burke <rob...@frantil.com> wrote: > > > > Chiming in for the Prism Runner implementation. > > > > I mostly didn't implement this in Prism because I had reached my limit > in dealing with the Graph at the time. > > > > The Python SDK approach to the optimizations is very set theory based, > which isn't as natural (to me at least) for handling the flatten unzipping. > > > > Not impossible, just not for me ;). > > > > On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >>> > >>> > I think I get the general gist in that you dont necessarily need to > combine the input pcollections to a flatten and instead you can just apply > non-aggregating consuming transforms to all input pcollections, but when is > a good time to do that? Do runners that implement this optimization always > apply this to all flattens? > >> > >> > >>> Pretty much whenever they can, though there are limitations (e.g. if > >>> DoFnC is stateful). I think it depends on the internal implementation > >>> of the runner whether this makes sense. > >> > >> > >> Is that why it's not implemented in `translations.py`? Since it doesn't > make sense for all runners? > > It's mostly not implemented in translations.py because I (and everyone > else) only has finite time, and the Python runner was not intended to > be a high-performance runner at the end of the day anyway. > > Not implementing fusion makes things extraordinarily slow (and also > very expensive memory-wise, unless you have sophisticated gc tracking > and the original intent was to be at least somewhat of a reference > implementation (though it went through a couple of refactorings that > largely destroyed that benefit)). It also does enough optimization to > actually exercise the worker code (for development purposes, and > cross-language usefulness) and then gained a couple of extra features > that were useful for all runners (e.g. combiner packing). > > Flatten unzipping can get surprisingly messy (ironically, Flatten is > one of the "simplest" transforms execution-wize but hardest to deal > with in an optimizer) and just materializing was an easy, obviously > correct solution that didn't impact performance too much (in context) > so it's never been added. Probably still be good to do someday, as it > unlocks fusion opportunities and such. > > - Robert > > > >> If you submit a workflow through the python SDK to Dataflow, does > dataflow do another round of optimizations? Or is the translation.py > optimization phases never used when using the python SDK with the Dataflow > runner? > >> > >> On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >>> > >>> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >>> > > >>> > I heard mention that there is a flatten unzipping optimization > implemented by some runners. I didn't see that in the python optimizations > in translations.py[1]. Just curious what this optimization is? > >>> > >>> It's done to increase the possibility of fusion. Suppose one has > >>> > >>> ... --> DoFnA \ > >>> --> Flatten --> DoFnC -> ... > >>> ... --> DoFnB / > >>> > >>> When determining the physical execution plan, one can re-write this as > >>> > >>> ... --> DoFnA --> DoFnC \ > >>> --> Flatten --> ... > >>> ... --> DoFnB --> DoFnC / > >>> > >>> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC). > >>> > >>> One can progressively do this up to the point that the consumer of the > >>> flatten already requires materialization that permits multiple inputs > >>> (e.g. writing to a shuffle/grouping operation). > >>> > >>> > I think I get the general gist in that you dont necessarily need to > combine the input pcollections to a flatten and instead you can just apply > non-aggregating consuming transforms to all input pcollections, but when is > a good time to do that? Do runners that implement this optimization always > apply this to all flattens? > >>> > >>> Pretty much whenever they can, though there are limitations (e.g. if > >>> DoFnC is stateful). I think it depends on the internal implementation > >>> of the runner whether this makes sense. > >>> > >>> > Cheers, > >>> > Joey > >>> > > >>> > [1] > https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py >