Chiming in for the Prism Runner implementation. I mostly didn't implement this in Prism because I had reached my limit in dealing with the Graph at the time.
The Python SDK approach to the optimizations is very set theory based, which isn't as natural (to me at least) for handling the flatten unzipping. Not impossible, just not for me ;). On Mon, Jan 27, 2025, 1:19 PM Joey Tran <joey.t...@schrodinger.com> wrote: > > I think I get the general gist in that you dont necessarily need to >> combine the input pcollections to a flatten and instead you can just apply >> non-aggregating consuming transforms to all input pcollections, but when is >> a good time to do that? Do runners that implement this optimization always >> apply this to all flattens? > > > Pretty much whenever they can, though there are limitations (e.g. if >> DoFnC is stateful). I think it depends on the internal implementation >> of the runner whether this makes sense. > > > Is that why it's not implemented in `translations.py`? Since it doesn't > make sense for all runners? > > If you submit a workflow through the python SDK to Dataflow, does dataflow > do another round of optimizations? Or is the translation.py optimization > phases never used when using the python SDK with the Dataflow runner? > > On Mon, Jan 27, 2025 at 4:09 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> > >> > I heard mention that there is a flatten unzipping optimization >> implemented by some runners. I didn't see that in the python optimizations >> in translations.py[1]. Just curious what this optimization is? >> >> It's done to increase the possibility of fusion. Suppose one has >> >> ... --> DoFnA \ >> --> Flatten --> DoFnC -> ... >> ... --> DoFnB / >> >> When determining the physical execution plan, one can re-write this as >> >> ... --> DoFnA --> DoFnC \ >> --> Flatten --> ... >> ... --> DoFnB --> DoFnC / >> >> which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC). >> >> One can progressively do this up to the point that the consumer of the >> flatten already requires materialization that permits multiple inputs >> (e.g. writing to a shuffle/grouping operation). >> >> > I think I get the general gist in that you dont necessarily need to >> combine the input pcollections to a flatten and instead you can just apply >> non-aggregating consuming transforms to all input pcollections, but when is >> a good time to do that? Do runners that implement this optimization always >> apply this to all flattens? >> >> Pretty much whenever they can, though there are limitations (e.g. if >> DoFnC is stateful). I think it depends on the internal implementation >> of the runner whether this makes sense. >> >> > Cheers, >> > Joey >> > >> > [1] >> https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py >> >