On Mon, Jan 27, 2025 at 3:40 PM Joey Tran <joey.t...@schrodinger.com> wrote: > > >> Thanks for the explanation and history! > >> > >> Are there any other major optimizations not included in translations.py? > > > Dataflow's fusion algorithm is more sophisticated (the one in Beam > > Python is just greedy which can give sub-optimal solution sometimes > > and doesn't take into account the size of the data being > > materialized). > whoa! If the optimization takes into account the size of the data being > materialized, does that mean it happens at runtime?
:) We have actually done explorations in that area, but generally we get the data from prior runs of the pipeline, if any. If needed there's an equivalent of resource hints to declare things like expected output size ratios for DoFns which, together with input source size estimation, that provide this data. (It was more critical back when things were built on sequences of MapReduces, and a fusion break with materialization had to be chosen between each one, but can still be pretty nice to have today.) It's briefly touched on in the FlumeJava paper. > On Mon, Jan 27, 2025 at 5:47 PM Robert Bradshaw <rober...@google.com> wrote: >> >> On Mon, Jan 27, 2025 at 2:29 PM Joey Tran <joey.t...@schrodinger.com> wrote: >> > >> > Ah I see, so some optimizations were implemented because they were >> > necessary for any kind of reasonable performance when running with the >> > local python runners, but those translations aren't actually used by the >> > large scale runners like Dataflow. I see now that the python SDK >> > DataflowRunner just submits a pre-optimized pipeline proto to dataflow, so >> > then dataflow presumably does its own optimizations. I wasn't sure if the >> > `translations.py` optimizations were used by all runners submitted through >> > the python SDK. >> >> IIRC Spark and Flink borrow(ed?) more from translations.py as the >> stage fuser on the portable Flink side wasn't as sophisticated (e.g. >> it made any DoFn with a side input a fusion break rather than figure >> out if the upstream was a producer of the side input). Even Dataflow >> leverages the combiner packing on the SDK side, but, yes, does it's >> own optimization with roots (though there have been multiple rewrites) >> harking back to the original FlumeJava paper of 2010. >> >> It's a bit of a conundrum, because if one has multiple SDKs on one >> runner, one wants to put the optimizations in the runner, but if one >> has multiple runners one is incentivized to put the optimization in >> the SDK. Unless you invoke a third component to do all of this (e.g. >> the Typescript SDK deferred to the Python SDK to invoke Dataflow). >> >> > Thanks for the explanation and history! >> > >> > Are there any other major optimizations not included in translations.py? >> >> Dataflow's fusion algorithm is more sophisticated (the one in Beam >> Python is just greedy which can give sub-optimal solution sometimes >> and doesn't take into account the size of the data being >> materialized). Dataflow also handles things like "right fitting" where >> stages are broken up to meet varying resource requirements (e.g. >> accelerator-requiring vs. not) and there are also optimizations >> specific to its shuffle implementation, and for Flume (which uses the >> same optimizer) there additional considerations like >> geographically-dispersed data and jobs that we don't have to deal with >> for Dataflow. >> >> Nothing beats out the importance of fusion though, and I think >> translations.py has pretty complete coverage of the expression of >> higher-level primitives into their executable components (that are >> then executed by worker code) which isn't "optimization" per se but an >> important part of the Beam model. >> >> > On Mon, Jan 27, 2025 at 5:05 PM Robert Bradshaw via dev >> > <dev@beam.apache.org> wrote: >> >> >> >> On Mon, Jan 27, 2025 at 1:46 PM Robert Burke <rob...@frantil.com> wrote: >> >> > >> >> > Chiming in for the Prism Runner implementation. >> >> > >> >> > I mostly didn't implement this in Prism because I had reached my limit >> >> > in dealing with the Graph at the time. >> >> > >> >> > The Python SDK approach to the optimizations is very set theory based, >> >> > which isn't as natural (to me at least) for handling the flatten >> >> > unzipping. >> >> > >> >> > Not impossible, just not for me ;). >> >> > >> >> > On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com> >> >> > wrote: >> >> >>> >> >> >>> > I think I get the general gist in that you dont necessarily need to >> >> >>> > combine the input pcollections to a flatten and instead you can >> >> >>> > just apply non-aggregating consuming transforms to all input >> >> >>> > pcollections, but when is a good time to do that? Do runners that >> >> >>> > implement this optimization always apply this to all flattens? >> >> >> >> >> >> >> >> >>> Pretty much whenever they can, though there are limitations (e.g. if >> >> >>> DoFnC is stateful). I think it depends on the internal implementation >> >> >>> of the runner whether this makes sense. >> >> >> >> >> >> >> >> >> Is that why it's not implemented in `translations.py`? Since it >> >> >> doesn't make sense for all runners? >> >> >> >> It's mostly not implemented in translations.py because I (and everyone >> >> else) only has finite time, and the Python runner was not intended to >> >> be a high-performance runner at the end of the day anyway. >> >> >> >> Not implementing fusion makes things extraordinarily slow (and also >> >> very expensive memory-wise, unless you have sophisticated gc tracking >> >> and the original intent was to be at least somewhat of a reference >> >> implementation (though it went through a couple of refactorings that >> >> largely destroyed that benefit)). It also does enough optimization to >> >> actually exercise the worker code (for development purposes, and >> >> cross-language usefulness) and then gained a couple of extra features >> >> that were useful for all runners (e.g. combiner packing). >> >> >> >> Flatten unzipping can get surprisingly messy (ironically, Flatten is >> >> one of the "simplest" transforms execution-wize but hardest to deal >> >> with in an optimizer) and just materializing was an easy, obviously >> >> correct solution that didn't impact performance too much (in context) >> >> so it's never been added. Probably still be good to do someday, as it >> >> unlocks fusion opportunities and such. >> >> >> >> - Robert >> >> >> >> >> >> >> If you submit a workflow through the python SDK to Dataflow, does >> >> >> dataflow do another round of optimizations? Or is the translation.py >> >> >> optimization phases never used when using the python SDK with the >> >> >> Dataflow runner? >> >> >> >> >> >> On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev >> >> >> <dev@beam.apache.org> wrote: >> >> >>> >> >> >>> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> >> >> >>> wrote: >> >> >>> > >> >> >>> > I heard mention that there is a flatten unzipping optimization >> >> >>> > implemented by some runners. I didn't see that in the python >> >> >>> > optimizations in translations.py[1]. Just curious what this >> >> >>> > optimization is? >> >> >>> >> >> >>> It's done to increase the possibility of fusion. Suppose one has >> >> >>> >> >> >>> ... --> DoFnA \ >> >> >>> --> Flatten --> DoFnC -> ... >> >> >>> ... --> DoFnB / >> >> >>> >> >> >>> When determining the physical execution plan, one can re-write this as >> >> >>> >> >> >>> ... --> DoFnA --> DoFnC \ >> >> >>> --> Flatten --> ... >> >> >>> ... --> DoFnB --> DoFnC / >> >> >>> >> >> >>> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC). >> >> >>> >> >> >>> One can progressively do this up to the point that the consumer of the >> >> >>> flatten already requires materialization that permits multiple inputs >> >> >>> (e.g. writing to a shuffle/grouping operation). >> >> >>> >> >> >>> > I think I get the general gist in that you dont necessarily need to >> >> >>> > combine the input pcollections to a flatten and instead you can >> >> >>> > just apply non-aggregating consuming transforms to all input >> >> >>> > pcollections, but when is a good time to do that? Do runners that >> >> >>> > implement this optimization always apply this to all flattens? >> >> >>> >> >> >>> Pretty much whenever they can, though there are limitations (e.g. if >> >> >>> DoFnC is stateful). I think it depends on the internal implementation >> >> >>> of the runner whether this makes sense. >> >> >>> >> >> >>> > Cheers, >> >> >>> > Joey >> >> >>> > >> >> >>> > [1] >> >> >>> > https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py