On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user <user@beam.apache.org> wrote:
> Creating composite DoFns is tricky today due to how they are implemented > (via annotated methods). > Note that this depends on the language. This should be really easy to do from Python. > However providing such a method to compose DoFns would be very useful IMO. > +1 > On Fri, Sep 15, 2023 at 9:33 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Yeah for (1) the concern would be adding a shuffle/fusion break and (2) >> sounds like the likely solution, was just hoping there'd be one that could >> wrap at the PTransform level but I realize now the PTransform abstraction >> is too general as you mentioned to do something like that. >> >> (2) will be likely what we do, though now I'm wondering if it might be >> possible to create a ParDo wrapper that can take a ParDo, extract it's >> dofn, wrap it, and return a new ParDo >> >> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user < >> user@beam.apache.org> wrote: >> >>> +1 to looking at composite transforms. You could even have a composite >>> transform that takes another transform as one of its construction arguments >>> and whose expand method does pre- and post-processing to the inputs/outputs >>> before/after applying the transform in question. (You could even implement >>> this as a Python decorator if you really wanted, either decorating the >>> expand method itself or the full class...) >>> >>> One of the difficulties is that for a general transform there isn't >>> necessarily a 1:N relationship between outputs and inputs as one has for a >>> DoFn (especially if there is any aggregation involved). There are, however, >>> two partial solutions that might help. >>> >>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that >>> returns at most N elements. You could do this with a CombinePerKey if you >>> can come up with a reasonable key (e.g. the id of your input elements) that >>> the limit should be a applied to. Note that this may cause a lot of data to >>> be shuffled (though due to combiner lifting, no more than N per bundle). >>> >>> (2) You could have a DoFn that limits to N per bundle by initializing a >>> counter in its start_bundle and passing elements through until the counter >>> reaches a threshold. (Again, one could do this per id if one is available.) >>> It wouldn't stop production of the elements, but if things get fused it >>> would still likely be fairly cheap. >>> >>> Both of these could be prepended to the problematic consuming PTransform >>> as well. >>> >>> - Robert >>> >>> >>> >>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> I'm aware of composite transforms and of the distributed nature of >>>> PTransforms. I'm not suggesting limiting the entire set and my example was >>>> more illustrative than the actual use case. >>>> >>>> My actual use case is basically: I have multiple PTransforms, and let's >>>> say most of them average ~100 generated outputs for a single input. Most of >>>> these PTransforms will occasionally run into an input though that might >>>> output maybe 1M outputs. This can cause issues if for example there are >>>> transforms that follow it that require a lot of compute per input. >>>> >>>> The simplest way to deal with this is to modify the `DoFn`s in our >>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated >>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across >>>> our transforms, but it'd be much cleaner if we could lift up this limiting >>>> logic out of the application logic and have some generic wrapper that >>>> extends our transforms. >>>> >>>> Thanks for the discussion! >>>> >>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko < >>>> aromanenko....@gmail.com> wrote: >>>> >>>>> I don’t think it’s possible to extend in a way that you are asking >>>>> (like, Java classes “*extend*"). Though, you can create your own >>>>> composite PTransform that will incorporate one or several others inside >>>>> *“expand()”* method. Actually, most of the Beam native PTransforms >>>>> are composite transforms. Please, take a look on doc and examples [1] >>>>> >>>>> Regarding your example, please, be aware that all PTransforms are >>>>> supposed to be executed in distributed environment and the order of >>>>> records >>>>> is not guaranteed. So, limiting the whole output by fixed number of >>>>> records >>>>> can be challenging - you’d need to make sure that it will be processed on >>>>> only one worker, that means that you’d need to shuffle all your records by >>>>> the same key and probably sort the records in way that you need. >>>>> >>>>> Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for >>>>> that? [2] >>>>> >>>>> If it doesn’t work for you, could you provide more details of your use >>>>> case? Then we probably can propose the more suitable solutions for that. >>>>> >>>>> [1] >>>>> https://beam.apache.org/documentation/programming-guide/#composite-transforms >>>>> [2] >>>>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html >>>>> >>>>> — >>>>> Alexey >>>>> >>>>> On 15 Sep 2023, at 14:22, Joey Tran <joey.t...@schrodinger.com> wrote: >>>>> >>>>> Is there a way to extend already defined PTransforms? My question is >>>>> probably better illustrated with an example. Let's say I have a PTransform >>>>> that generates a very variable number of outputs. I'd like to "wrap" that >>>>> PTransform such that if it ever creates more than say 1,000 outputs, then >>>>> I >>>>> just take the first 1,000 outputs without generating the rest of the >>>>> outputs. >>>>> >>>>> It'd be trivial if I have access to the DoFn, but what if the >>>>> PTransform in question doesn't expose the `DoFn`? >>>>> >>>>> >>>>>