Am I right in understanding that "TupleCombineFn" is the Python name for ComposedCombineFn? ( https://beam.apache.org/releases/javadoc/2.59.0/index.html?org/apache/beam/sdk/transforms/CombineFns.ComposedCombineFn.html )
Kenn On Sun, Sep 29, 2024 at 7:51 PM Joey Tran <joey.t...@schrodinger.com> wrote: > Ah I realize now that `standard_optimize_phases` is not actually currently > used by the fn runner so these changes don't effectively do anything > > On Sun, Sep 29, 2024 at 3:19 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> I took a crack at trying to replace GBK+CombineValues with CBK so then it >> doesn't matter what the user chooses to do. >> >> https://github.com/apache/beam/pull/32592 >> >> On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> Hmm, it makes sense from a runner optimization perspective, but I think >>> it's a lot less ergonomic to publish combinefns instead of ptransforms. >>> Another drawback to the stacked combinefn is the label will have to be a >>> mash of possibly very different combiners squashed into one >>> >>> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> I'd be worried about encouraging the anti-pattern of GroupByKey() + >>>> CombinePerGroup() which would make the important (often essential) combiner >>>> lifting optimization harder (pattern detection in the runner vs. composite >>>> detection). >>>> >>>> You might also be interested in the TupleCombineFns >>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717 >>>> >>>> keyed_nums = ... >>>> combined_nums = keyed_nums | CombinePerKey( >>>> combiners.SingleInputTupleCombineFn( >>>> sum, >>>> combiners.MeanCombineFn(), >>>> ...)) >>>> >>>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're >>>> not very compressible. I think they were needed to make java typing work >>>> out well in java 7 and then copied to Python, but I would suggest just >>>> using CombineGlobally(...) and CombinePerKey(...) over these. >>>> >>>> You might also be interested to know that in Python we already have >>>> combiner consolidation >>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953 >>>> >>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161 >>>> >>>> >>>> I don't remember what the status is of enabling this by default (IIRC, >>>> they're conditionally enabled by decorating a transform). >>>> >>>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of >>>>> combiners? Named `PerGroup` or `PerGroupedValues`? >>>>> >>>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev < >>>>> valen...@google.com> wrote: >>>>> >>>>>> >>>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Ah! That is exactly the kind of primitive I was looking for but >>>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well >>>>>>> for >>>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`). >>>>>>> Thanks! >>>>>>> >>>>>>> If we did want to add `PerGroupedValues` to our current combiners >>>>>>> I'd also be happy to put up a PR doing that >>>>>>> >>>>>> >>>>>> I don't see why not. I'd run by dev@ for naming ideas. PerGroup is >>>>>> another possibility. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev < >>>>>>> valen...@google.com> wrote: >>>>>>> >>>>>>>> The closest primitve to that intent seems to be CombineValues: >>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>>>>>>> , and you should be able to write: >>>>>>>> >>>>>>>> max_sample_size = 100_000 >>>>>>>> ( keyed_nums >>>>>>>> | GroupByKey() >>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>> | CombineValues(MeanCombineFn()) >>>>>>>> ``` >>>>>>>> Would that work for other scenarios you have in mind? >>>>>>>> >>>>>>>> Haven't thought too much about this but from looking at >>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues >>>>>>>> there. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran < >>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>> >>>>>>>>> It feels more natural because it's only using GroupByKey once >>>>>>>>> instead of once per combiner. Which I think is still more efficient >>>>>>>>> even >>>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline >>>>>>>>> optimization that merges multiple groupbykey's on the same >>>>>>>>> pcollection into >>>>>>>>> a single GBK). >>>>>>>>> >>>>>>>>> You can imagine a different use case where this pattern might >>>>>>>>> arise that isn't just trying to reduce GBKs though. For example: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> max_sample_size = 100_000 >>>>>>>>> ( keyed_nums >>>>>>>>> | GroupByKey() >>>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>>> | #?? Mean.PerGrouped()? >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> To take the mean of every grouped_values using current combiners, >>>>>>>>> I think you'd have to use an inverted groupbykey and then call >>>>>>>>> `Mean.PerKey()` unless I'm missing something. >>>>>>>>> >>>>>>>>> (I recognize that writing a Map that takes a mean is simple >>>>>>>>> enough, but in a real use case we might have a more complicated >>>>>>>>> combiner) >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran < >>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey all, >>>>>>>>>>> >>>>>>>>>>> Just curious if this pattern comes up for others and if people >>>>>>>>>>> have worked out a good convention. >>>>>>>>>>> >>>>>>>>>>> There are many combiners and a lot of them have two forms: a >>>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. >>>>>>>>>>> Count.PerKey). >>>>>>>>>>> These are convenient but it feels like often we're running into the >>>>>>>>>>> case >>>>>>>>>>> where we GroupBy a set of data once and then wish to perform a >>>>>>>>>>> series of >>>>>>>>>>> combines on them, in which case neither of these forms work, and it >>>>>>>>>>> begs >>>>>>>>>>> another form which operates on pre-grouped KVs. >>>>>>>>>>> >>>>>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers >>>>>>>>>>> and you want to calculate some summary statistics on them. You >>>>>>>>>>> could do: >>>>>>>>>>> ``` >>>>>>>>>>> keyed_means = (keyed_nums >>>>>>>>>>> | Mean.PerKey()) >>>>>>>>>>> keyed_counts = (keyed_num >>>>>>>>>>> | Count.PerKey()) >>>>>>>>>>> ... # other combines >>>>>>>>>>> ``` >>>>>>>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Does it feel more natural because it feels as though it would be >>>>>>>>>> more performant? Because it seems like it adds an extra grouping >>>>>>>>>> step to >>>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that >>>>>>>>>> Dataflow has the "combiner lifting" optimization, and >>>>>>>>>> combiner-specified-reduction happens before the data is written into >>>>>>>>>> shuffle as much as possible: >>>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>>>>>>> . >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> grouped_nums = keyed_nums | GBK() >>>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>>>>>>> ``` >>>>>>>>>>> But these "PerGrouped" variants don't actually currently exist. >>>>>>>>>>> Does anyone else run into this pattern often? I might be missing an >>>>>>>>>>> obvious >>>>>>>>>>> pattern here. >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>>>>>>> >>>>>>>>>>> *he/him* >>>>>>>>>>> >>>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>>>>>>> >>>>>>>>>>