On Fri, Oct 4, 2024 at 2:50 PM Joey Tran <joey.t...@schrodinger.com> wrote:
> Would anyone be up to take a look at the approach here? I think the > standard translation phases are currently used by the portable runner so > the portable runner tests might indicate it works? > I think this could work, though one would also have to make sure all runners do this optimization path. Currently all runners (including those implemented in Java, where Python sends its jobs) do support the CombinePerKey lifting, as that's bread and butter for data processing systems. Note that this doesn't handle the original case of trying to do multiple CombineValues for a single GBK result. My general leaning is that I'd rather the multiplicity of representations where the combiner lifting optimizations are expected to apply for both the user-facing API and in the portable representation. But I can see an argument for it as well. We did add an optimization for keyed_means = keyed_nums | CombinePerKey(MeanCombineFn()) keyed_counts = keyed_nums | CombinePerKey(CountCombineFn()) ... If TupleCombineFn is too cumbersome to use, another alternative is a composite that implements keyed_means, keyed_counts, ... = keyed_nums | MultiCombinePerKey(MeanCombineFn(), CountCombineFn(), ...) > On Sun, Sep 29, 2024, 3:19 PM Joey Tran <joey.t...@schrodinger.com> wrote: > >> I took a crack at trying to replace GBK+CombineValues with CBK so then it >> doesn't matter what the user chooses to do. >> >> https://github.com/apache/beam/pull/32592 >> >> On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> Hmm, it makes sense from a runner optimization perspective, but I think >>> it's a lot less ergonomic to publish combinefns instead of ptransforms. >>> Another drawback to the stacked combinefn is the label will have to be a >>> mash of possibly very different combiners squashed into one >>> >>> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> I'd be worried about encouraging the anti-pattern of GroupByKey() + >>>> CombinePerGroup() which would make the important (often essential) combiner >>>> lifting optimization harder (pattern detection in the runner vs. composite >>>> detection). >>>> >>>> You might also be interested in the TupleCombineFns >>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717 >>>> >>>> keyed_nums = ... >>>> combined_nums = keyed_nums | CombinePerKey( >>>> combiners.SingleInputTupleCombineFn( >>>> sum, >>>> combiners.MeanCombineFn(), >>>> ...)) >>>> >>>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're >>>> not very compressible. I think they were needed to make java typing work >>>> out well in java 7 and then copied to Python, but I would suggest just >>>> using CombineGlobally(...) and CombinePerKey(...) over these. >>>> >>>> You might also be interested to know that in Python we already have >>>> combiner consolidation >>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953 >>>> >>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161 >>>> >>>> >>>> I don't remember what the status is of enabling this by default (IIRC, >>>> they're conditionally enabled by decorating a transform). >>>> >>>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of >>>>> combiners? Named `PerGroup` or `PerGroupedValues`? >>>>> >>>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev < >>>>> valen...@google.com> wrote: >>>>> >>>>>> >>>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Ah! That is exactly the kind of primitive I was looking for but >>>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well >>>>>>> for >>>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`). >>>>>>> Thanks! >>>>>>> >>>>>>> If we did want to add `PerGroupedValues` to our current combiners >>>>>>> I'd also be happy to put up a PR doing that >>>>>>> >>>>>> >>>>>> I don't see why not. I'd run by dev@ for naming ideas. PerGroup is >>>>>> another possibility. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev < >>>>>>> valen...@google.com> wrote: >>>>>>> >>>>>>>> The closest primitve to that intent seems to be CombineValues: >>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>>>>>>> , and you should be able to write: >>>>>>>> >>>>>>>> max_sample_size = 100_000 >>>>>>>> ( keyed_nums >>>>>>>> | GroupByKey() >>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>> | CombineValues(MeanCombineFn()) >>>>>>>> ``` >>>>>>>> Would that work for other scenarios you have in mind? >>>>>>>> >>>>>>>> Haven't thought too much about this but from looking at >>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues >>>>>>>> there. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran < >>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>> >>>>>>>>> It feels more natural because it's only using GroupByKey once >>>>>>>>> instead of once per combiner. Which I think is still more efficient >>>>>>>>> even >>>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline >>>>>>>>> optimization that merges multiple groupbykey's on the same >>>>>>>>> pcollection into >>>>>>>>> a single GBK). >>>>>>>>> >>>>>>>>> You can imagine a different use case where this pattern might >>>>>>>>> arise that isn't just trying to reduce GBKs though. For example: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> max_sample_size = 100_000 >>>>>>>>> ( keyed_nums >>>>>>>>> | GroupByKey() >>>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>>> | #?? Mean.PerGrouped()? >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> To take the mean of every grouped_values using current combiners, >>>>>>>>> I think you'd have to use an inverted groupbykey and then call >>>>>>>>> `Mean.PerKey()` unless I'm missing something. >>>>>>>>> >>>>>>>>> (I recognize that writing a Map that takes a mean is simple >>>>>>>>> enough, but in a real use case we might have a more complicated >>>>>>>>> combiner) >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran < >>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey all, >>>>>>>>>>> >>>>>>>>>>> Just curious if this pattern comes up for others and if people >>>>>>>>>>> have worked out a good convention. >>>>>>>>>>> >>>>>>>>>>> There are many combiners and a lot of them have two forms: a >>>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. >>>>>>>>>>> Count.PerKey). >>>>>>>>>>> These are convenient but it feels like often we're running into the >>>>>>>>>>> case >>>>>>>>>>> where we GroupBy a set of data once and then wish to perform a >>>>>>>>>>> series of >>>>>>>>>>> combines on them, in which case neither of these forms work, and it >>>>>>>>>>> begs >>>>>>>>>>> another form which operates on pre-grouped KVs. >>>>>>>>>>> >>>>>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers >>>>>>>>>>> and you want to calculate some summary statistics on them. You >>>>>>>>>>> could do: >>>>>>>>>>> ``` >>>>>>>>>>> keyed_means = (keyed_nums >>>>>>>>>>> | Mean.PerKey()) >>>>>>>>>>> keyed_counts = (keyed_num >>>>>>>>>>> | Count.PerKey()) >>>>>>>>>>> ... # other combines >>>>>>>>>>> ``` >>>>>>>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Does it feel more natural because it feels as though it would be >>>>>>>>>> more performant? Because it seems like it adds an extra grouping >>>>>>>>>> step to >>>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that >>>>>>>>>> Dataflow has the "combiner lifting" optimization, and >>>>>>>>>> combiner-specified-reduction happens before the data is written into >>>>>>>>>> shuffle as much as possible: >>>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>>>>>>> . >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> grouped_nums = keyed_nums | GBK() >>>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>>>>>>> ``` >>>>>>>>>>> But these "PerGrouped" variants don't actually currently exist. >>>>>>>>>>> Does anyone else run into this pattern often? I might be missing an >>>>>>>>>>> obvious >>>>>>>>>>> pattern here. >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>>>>>>> >>>>>>>>>>> *he/him* >>>>>>>>>>> >>>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>>>>>>> >>>>>>>>>>